diff --git a/pkg/ccl/importccl/import_processor_test.go b/pkg/ccl/importccl/import_processor_test.go index 1d58012becd3..7281deec0cfb 100644 --- a/pkg/ccl/importccl/import_processor_test.go +++ b/pkg/ccl/importccl/import_processor_test.go @@ -116,8 +116,7 @@ func TestConverterFlushesBatches(t *testing.T) { } kvCh := make(chan row.KVBatch, batchSize) - conv, err := makeInputConverter(ctx, converterSpec, &evalCtx, kvCh, - nil /* seqChunkProvider */) + conv, err := makeInputConverter(ctx, converterSpec, &evalCtx, kvCh, nil /* seqChunkProvider */) if err != nil { t.Fatalf("makeInputConverter() error = %v", err) } @@ -953,7 +952,8 @@ func pgDumpFormat() roachpb.IOFileFormat { return roachpb.IOFileFormat{ Format: roachpb.IOFileFormat_PgDump, PgDump: roachpb.PgDumpOptions{ - MaxRowSize: 64 * 1024, + MaxRowSize: 64 * 1024, + IgnoreUnsupported: true, }, } } diff --git a/pkg/ccl/importccl/import_stmt.go b/pkg/ccl/importccl/import_stmt.go index 3276a22b188e..70e333b31231 100644 --- a/pkg/ccl/importccl/import_stmt.go +++ b/pkg/ccl/importccl/import_stmt.go @@ -9,6 +9,7 @@ package importccl import ( + "bytes" "context" "fmt" "io/ioutil" @@ -100,6 +101,12 @@ const ( avroSchema = "schema" avroSchemaURI = "schema_uri" + pgDumpIgnoreAllUnsupported = "ignore_unsupported" + pgDumpIgnoreShuntFileDest = "ignored_stmt_log" + pgDumpUnsupportedSchemaStmtLog = "unsupported_schema_stmts" + pgDumpUnsupportedDataStmtLog = "unsupported_data-_stmts" + pgDumpMaxLoggedStmts = 10 + // RunningStatusImportBundleParseSchema indicates to the user that a bundle format // schema is being parsed runningStatusImportBundleParseSchema jobs.RunningStatus = "parsing schema on Import Bundle" @@ -135,6 +142,9 @@ var importOptionExpectValues = map[string]sql.KVStringOptValidate{ avroRecordsSeparatedBy: sql.KVStringOptRequireValue, avroBinRecords: sql.KVStringOptRequireNoValue, avroJSONRecords: sql.KVStringOptRequireNoValue, + + pgDumpIgnoreAllUnsupported: sql.KVStringOptRequireNoValue, + pgDumpIgnoreShuntFileDest: sql.KVStringOptRequireValue, } func makeStringSet(opts ...string) map[string]struct{} { @@ -164,7 +174,8 @@ var mysqlOutAllowedOptions = makeStringSet( ) var mysqlDumpAllowedOptions = makeStringSet(importOptionSkipFKs, csvRowLimit) var pgCopyAllowedOptions = makeStringSet(pgCopyDelimiter, pgCopyNull, optMaxRowSize) -var pgDumpAllowedOptions = makeStringSet(optMaxRowSize, importOptionSkipFKs, csvRowLimit) +var pgDumpAllowedOptions = makeStringSet(optMaxRowSize, importOptionSkipFKs, csvRowLimit, + pgDumpIgnoreAllUnsupported, pgDumpIgnoreShuntFileDest) // DROP is required because the target table needs to be take offline during // IMPORT INTO. @@ -615,6 +626,16 @@ func importPlanHook( maxRowSize = int32(sz) } format.PgDump.MaxRowSize = maxRowSize + if _, ok := opts[pgDumpIgnoreAllUnsupported]; ok { + format.PgDump.IgnoreUnsupported = true + } + + if dest, ok := opts[pgDumpIgnoreShuntFileDest]; ok { + if !format.PgDump.IgnoreUnsupported { + return errors.New("cannot log unsupported PGDUMP stmts without `ignore_unsupported` option") + } + format.PgDump.IgnoreUnsupportedLog = dest + } if override, ok := opts[csvRowLimit]; ok { rowLimit, err := strconv.Atoi(override) @@ -1295,6 +1316,102 @@ func (r *importResumer) ReportResults(ctx context.Context, resultsCh chan<- tree } } +type loggerKind int + +const ( + schemaParsing loggerKind = iota + dataIngestion +) + +// unsupportedStmtLogger is responsible for handling unsupported PGDUMP SQL +// statements seen during the import. +type unsupportedStmtLogger struct { + // Values are initialized based on the options specified in the IMPORT PGDUMP + // stmt. + ignoreUnsupported bool + ignoreUnsupportedLogDest string + externalStorage cloud.ExternalStorageFactory + + // logBuffer holds the string to be flushed to the ignoreUnsupportedLogDest. + logBuffer *bytes.Buffer + numIgnoredStmts int + + loggerType loggerKind +} + +func makeUnsupportedStmtLogger( + ignoreUnsupported bool, + unsupportedLogDest string, + loggerType loggerKind, + externalStorage cloud.ExternalStorageFactory, +) *unsupportedStmtLogger { + l := &unsupportedStmtLogger{ + ignoreUnsupported: ignoreUnsupported, + ignoreUnsupportedLogDest: unsupportedLogDest, + loggerType: loggerType, + logBuffer: new(bytes.Buffer), + externalStorage: externalStorage, + } + header := "Unsupported statements during schema parse phase:\n\n" + if loggerType == dataIngestion { + header = "Unsupported statements during data ingestion phase:\n\n" + } + l.logBuffer.WriteString(header) + return l +} + +func (u *unsupportedStmtLogger) log(logLine string, isParseError bool) { + // We have already logged parse errors during the schema ingestion phase, so + // skip them to avoid duplicate entries. + skipLoggingParseErr := isParseError && u.loggerType == dataIngestion + if u.ignoreUnsupportedLogDest == "" || skipLoggingParseErr { + return + } + + if u.numIgnoredStmts < pgDumpMaxLoggedStmts { + if isParseError { + logLine = fmt.Sprintf("%s: could not be parsed\n", logLine) + } else { + logLine = fmt.Sprintf("%s: unsupported by IMPORT\n", logLine) + } + u.logBuffer.Write([]byte(logLine)) + } + u.numIgnoredStmts++ +} + +func (u *unsupportedStmtLogger) flush(ctx context.Context, user security.SQLUsername) error { + if u.ignoreUnsupportedLogDest == "" { + return nil + } + + numLoggedStmts := pgDumpMaxLoggedStmts + if u.numIgnoredStmts < pgDumpMaxLoggedStmts { + numLoggedStmts = u.numIgnoredStmts + } + u.logBuffer.WriteString(fmt.Sprintf("\nLogging %d out of %d ignored statements.\n", + numLoggedStmts, u.numIgnoredStmts)) + + conf, err := cloudimpl.ExternalStorageConfFromURI(u.ignoreUnsupportedLogDest, user) + if err != nil { + return errors.Wrap(err, "failed to log unsupported stmts during IMPORT PGDUMP") + } + var s cloud.ExternalStorage + if s, err = u.externalStorage(ctx, conf); err != nil { + return errors.New("failed to log unsupported stmts during IMPORT PGDUMP") + } + defer s.Close() + + logFileName := pgDumpUnsupportedSchemaStmtLog + if u.loggerType == dataIngestion { + logFileName = pgDumpUnsupportedDataStmtLog + } + err = s.WriteFile(ctx, logFileName, bytes.NewReader(u.logBuffer.Bytes())) + if err != nil { + return errors.Wrap(err, "failed to log unsupported stmts to log during IMPORT PGDUMP") + } + return nil +} + // parseAndCreateBundleTableDescs parses and creates the table // descriptors for bundle formats. func parseAndCreateBundleTableDescs( @@ -1344,7 +1461,19 @@ func parseAndCreateBundleTableDescs( tableDescs, err = readMysqlCreateTable(ctx, reader, evalCtx, p, defaultCSVTableID, parentID, tableName, fks, seqVals, owner, walltime) case roachpb.IOFileFormat_PgDump: evalCtx := &p.ExtendedEvalContext().EvalContext - tableDescs, err = readPostgresCreateTable(ctx, reader, evalCtx, p, tableName, parentID, walltime, fks, int(format.PgDump.MaxRowSize), owner) + + // Setup a logger to handle unsupported DDL statements in the PGDUMP file. + unsupportedStmtLogger := makeUnsupportedStmtLogger(format.PgDump.IgnoreUnsupported, + format.PgDump.IgnoreUnsupportedLog, schemaParsing, p.ExecCfg().DistSQLSrv.ExternalStorage) + + tableDescs, err = readPostgresCreateTable(ctx, reader, evalCtx, p, tableName, parentID, + walltime, fks, int(format.PgDump.MaxRowSize), owner, unsupportedStmtLogger) + + logErr := unsupportedStmtLogger.flush(ctx, p.User()) + if logErr != nil { + return nil, logErr + } + default: return tableDescs, errors.Errorf("non-bundle format %q does not support reading schemas", format.Format.String()) } diff --git a/pkg/ccl/importccl/import_stmt_test.go b/pkg/ccl/importccl/import_stmt_test.go index 032ade9d296a..52b7a595ed9d 100644 --- a/pkg/ccl/importccl/import_stmt_test.go +++ b/pkg/ccl/importccl/import_stmt_test.go @@ -819,6 +819,7 @@ END; name: "fk", typ: "PGDUMP", data: testPgdumpFk, + with: "WITH ignore_unsupported", query: map[string][][]string{ getTablesQuery: { {"public", "cities", "table"}, @@ -897,7 +898,7 @@ END; name: "fk-skip", typ: "PGDUMP", data: testPgdumpFk, - with: `WITH skip_foreign_keys`, + with: `WITH skip_foreign_keys, ignore_unsupported`, query: map[string][][]string{ getTablesQuery: { {"public", "cities", "table"}, @@ -912,13 +913,14 @@ END; name: "fk unreferenced", typ: "TABLE weather FROM PGDUMP", data: testPgdumpFk, + with: "WITH ignore_unsupported", err: `table "cities" not found`, }, { name: "fk unreferenced skipped", typ: "TABLE weather FROM PGDUMP", data: testPgdumpFk, - with: `WITH skip_foreign_keys`, + with: `WITH skip_foreign_keys, ignore_unsupported`, query: map[string][][]string{ getTablesQuery: {{"public", "weather", "table"}}, }, @@ -937,6 +939,7 @@ END; { name: "sequence", typ: "PGDUMP", + with: "WITH ignore_unsupported", data: ` CREATE TABLE t (a INT8); CREATE SEQUENCE public.i_seq @@ -965,6 +968,7 @@ END; INSERT INTO "bob" ("c", "b") VALUES (3, 2); COMMIT `, + with: `WITH ignore_unsupported`, query: map[string][][]string{ `SELECT * FROM bob`: { {"1", "NULL", "2"}, @@ -1021,29 +1025,6 @@ END; data: "create table s.t (i INT8)", err: `non-public schemas unsupported: s`, }, - { - name: "various create ignores", - typ: "PGDUMP", - data: ` - CREATE TRIGGER conditions_set_updated_at BEFORE UPDATE ON conditions FOR EACH ROW EXECUTE PROCEDURE set_updated_at(); - REVOKE ALL ON SEQUENCE knex_migrations_id_seq FROM PUBLIC; - REVOKE ALL ON SEQUENCE knex_migrations_id_seq FROM database; - GRANT ALL ON SEQUENCE knex_migrations_id_seq TO database; - GRANT SELECT ON SEQUENCE knex_migrations_id_seq TO opentrials_readonly; - - CREATE FUNCTION public.isnumeric(text) RETURNS boolean - LANGUAGE sql - AS $_$ - SELECT $1 ~ '^[0-9]+$' - $_$; - ALTER FUNCTION public.isnumeric(text) OWNER TO roland; - - CREATE TABLE t (i INT8); - `, - query: map[string][][]string{ - getTablesQuery: {{"public", "t", "table"}}, - }, - }, { name: "many tables", typ: "PGDUMP", @@ -1600,7 +1581,8 @@ func TestImportRowLimit(t *testing.T) { expectedRowLimit := 4 // Import a single table `second` and verify number of rows imported. - importQuery := fmt.Sprintf(`IMPORT TABLE second FROM PGDUMP ($1) WITH row_limit="%d"`, expectedRowLimit) + importQuery := fmt.Sprintf(`IMPORT TABLE second FROM PGDUMP ($1) WITH row_limit="%d",ignore_unsupported`, + expectedRowLimit) sqlDB.Exec(t, importQuery, second...) var numRows int @@ -1611,7 +1593,7 @@ func TestImportRowLimit(t *testing.T) { // Import multiple tables including `simple` and `second`. expectedRowLimit = 3 - importQuery = fmt.Sprintf(`IMPORT PGDUMP ($1) WITH row_limit="%d"`, expectedRowLimit) + importQuery = fmt.Sprintf(`IMPORT PGDUMP ($1) WITH row_limit="%d",ignore_unsupported`, expectedRowLimit) sqlDB.Exec(t, importQuery, multitable...) sqlDB.QueryRow(t, "SELECT count(*) FROM second").Scan(&numRows) require.Equal(t, expectedRowLimit, numRows) @@ -5534,14 +5516,14 @@ func TestImportPgDump(t *testing.T) { CONSTRAINT simple_pkey PRIMARY KEY (i), UNIQUE INDEX simple_b_s_idx (b, s), INDEX simple_s_idx (s) - ) PGDUMP DATA ($1)`, + ) PGDUMP DATA ($1) WITH ignore_unsupported`, simple, }, - {`single table dump`, expectSimple, `IMPORT TABLE simple FROM PGDUMP ($1)`, simple}, - {`second table dump`, expectSecond, `IMPORT TABLE second FROM PGDUMP ($1)`, second}, - {`simple from multi`, expectSimple, `IMPORT TABLE simple FROM PGDUMP ($1)`, multitable}, - {`second from multi`, expectSecond, `IMPORT TABLE second FROM PGDUMP ($1)`, multitable}, - {`all from multi`, expectAll, `IMPORT PGDUMP ($1)`, multitable}, + {`single table dump`, expectSimple, `IMPORT TABLE simple FROM PGDUMP ($1) WITH ignore_unsupported`, simple}, + {`second table dump`, expectSecond, `IMPORT TABLE second FROM PGDUMP ($1) WITH ignore_unsupported`, second}, + {`simple from multi`, expectSimple, `IMPORT TABLE simple FROM PGDUMP ($1) WITH ignore_unsupported`, multitable}, + {`second from multi`, expectSecond, `IMPORT TABLE second FROM PGDUMP ($1) WITH ignore_unsupported`, multitable}, + {`all from multi`, expectAll, `IMPORT PGDUMP ($1) WITH ignore_unsupported`, multitable}, } { t.Run(c.name, func(t *testing.T) { sqlDB.Exec(t, `DROP TABLE IF EXISTS simple, second`) @@ -5681,6 +5663,137 @@ func TestImportPgDump(t *testing.T) { }) } +func TestImportPgDumpIgnoredStmts(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1 /* nodes */, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + conn := tc.Conns[0] + sqlDB := sqlutils.MakeSQLRunner(conn) + + data := ` + -- Statements that CRDB cannot parse. + CREATE TRIGGER conditions_set_updated_at BEFORE UPDATE ON conditions FOR EACH ROW EXECUTE PROCEDURE set_updated_at(); + + REVOKE ALL ON SEQUENCE knex_migrations_id_seq FROM PUBLIC; + REVOKE ALL ON SEQUENCE knex_migrations_id_seq FROM database; + + GRANT ALL ON SEQUENCE knex_migrations_id_seq TO database; + GRANT SELECT ON SEQUENCE knex_migrations_id_seq TO opentrials_readonly; + + COMMENT ON EXTENSION plpgsql IS 'PL/pgSQL procedural language'; + CREATE EXTENSION IF NOT EXISTS plpgsql WITH SCHEMA pg_catalog; + + -- Valid statement. + CREATE TABLE foo (id INT); + + CREATE FUNCTION public.isnumeric(text) RETURNS boolean + LANGUAGE sql + AS $_$ + SELECT $1 ~ '^[0-9]+$' + $_$; + ALTER FUNCTION public.isnumeric(text) OWNER TO roland; + + -- Valid statements. + INSERT INTO foo VALUES (1), (2), (3); + CREATE TABLE t (i INT8); + + -- Statements that CRDB can parse, but IMPORT does not support. + -- These are processed during the schema pass of IMPORT. + COMMENT ON TABLE t IS 'This should be skipped'; + COMMENT ON DATABASE t IS 'This should be skipped'; + COMMENT ON COLUMN t IS 'This should be skipped'; + + + -- Statements that CRDB can parse, but IMPORT does not support. + -- These are processed during the data ingestion pass of IMPORT. + SELECT pg_catalog.set_config('search_path', '', false); + DELETE FROM geometry_columns WHERE f_table_name = 'nyc_census_blocks' AND f_table_schema = 'public'; + ` + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == "GET" { + _, _ = w.Write([]byte(data)) + } + })) + defer srv.Close() + t.Run("ignore-unsupported", func(t *testing.T) { + sqlDB.Exec(t, "CREATE DATABASE foo; USE foo;") + sqlDB.Exec(t, "IMPORT PGDUMP ($1) WITH ignore_unsupported", srv.URL) + // Check that statements which are not expected to be ignored, are still + // processed. + sqlDB.CheckQueryResults(t, "SELECT * FROM foo", [][]string{{"1"}, {"2"}, {"3"}}) + }) + + t.Run("dont-ignore-unsupported", func(t *testing.T) { + sqlDB.Exec(t, "CREATE DATABASE foo1; USE foo1;") + sqlDB.ExpectErr(t, "syntax error", "IMPORT PGDUMP ($1)", srv.URL) + }) + + t.Run("require-both-unsupported-options", func(t *testing.T) { + sqlDB.Exec(t, "CREATE DATABASE foo2; USE foo2;") + ignoredLog := `userfile:///ignore.log` + sqlDB.ExpectErr(t, "cannot log unsupported PGDUMP stmts without `ignore_unsupported` option", + "IMPORT PGDUMP ($1) WITH ignored_stmt_log=$2", srv.URL, ignoredLog) + }) + + t.Run("log-unsupported-stmts", func(t *testing.T) { + sqlDB.Exec(t, "CREATE DATABASE foo3; USE foo3;") + ignoredLog := `userfile:///ignore.log` + sqlDB.Exec(t, "IMPORT PGDUMP ($1) WITH ignore_unsupported, ignored_stmt_log=$2", + srv.URL, ignoredLog) + // Check that statements which are not expected to be ignored, are still + // processed. + sqlDB.CheckQueryResults(t, "SELECT * FROM foo", [][]string{{"1"}, {"2"}, {"3"}}) + + // Read the unsupported log and verify its contents. + store, err := cloudimpl.ExternalStorageFromURI(ctx, ignoredLog, + base.ExternalIODirConfig{}, + tc.Servers[0].ClusterSettings(), + blobs.TestEmptyBlobClientFactory, + security.RootUserName(), + tc.Servers[0].InternalExecutor().(*sql.InternalExecutor), tc.Servers[0].DB()) + require.NoError(t, err) + defer store.Close() + content, err := store.ReadFile(ctx, pgDumpUnsupportedSchemaStmtLog) + require.NoError(t, err) + descBytes, err := ioutil.ReadAll(content) + require.NoError(t, err) + expectedSchemaLog := `Unsupported statements during schema parse phase: + +create trigger: could not be parsed +revoke privileges on sequence: could not be parsed +revoke privileges on sequence: could not be parsed +grant privileges on sequence: could not be parsed +grant privileges on sequence: could not be parsed +comment on extension: could not be parsed +create extension if not exists with: could not be parsed +create function: could not be parsed +alter function: could not be parsed +COMMENT ON TABLE t IS 'This should be skipped': unsupported by IMPORT + +Logging 10 out of 13 ignored statements. +` + require.Equal(t, []byte(expectedSchemaLog), descBytes) + + expectedDataLog := `Unsupported statements during data ingestion phase: + +unsupported 3 fn args in select: ['search_path' '' false]: unsupported by IMPORT +unsupported *tree.Delete statement: DELETE FROM geometry_columns WHERE (f_table_name = 'nyc_census_blocks') AND (f_table_schema = 'public'): unsupported by IMPORT + +Logging 2 out of 2 ignored statements. +` + + content, err = store.ReadFile(ctx, pgDumpUnsupportedDataStmtLog) + require.NoError(t, err) + descBytes, err = ioutil.ReadAll(content) + require.NoError(t, err) + require.Equal(t, []byte(expectedDataLog), descBytes) + }) +} + // TestImportPgDumpGeo tests that a file with SQLFn classes can be // imported. These are functions like AddGeometryColumn which create and // execute SQL when called (!). They are, for example, used by shp2pgsql @@ -5701,7 +5814,7 @@ func TestImportPgDumpGeo(t *testing.T) { sqlDB := sqlutils.MakeSQLRunner(conn) sqlDB.Exec(t, `CREATE DATABASE importdb; SET DATABASE = importdb`) - sqlDB.Exec(t, "IMPORT PGDUMP 'nodelocal://0/geo_shp2pgsql.sql'") + sqlDB.Exec(t, "IMPORT PGDUMP 'nodelocal://0/geo_shp2pgsql.sql' WITH ignore_unsupported") sqlDB.Exec(t, `CREATE DATABASE execdb; SET DATABASE = execdb`) geoSQL, err := ioutil.ReadFile(filepath.Join(baseDir, "geo_shp2pgsql.sql")) @@ -5744,7 +5857,7 @@ func TestImportPgDumpGeo(t *testing.T) { sqlDB := sqlutils.MakeSQLRunner(conn) sqlDB.Exec(t, `CREATE DATABASE importdb; SET DATABASE = importdb`) - sqlDB.Exec(t, "IMPORT PGDUMP 'nodelocal://0/geo_ogr2ogr.sql'") + sqlDB.Exec(t, "IMPORT PGDUMP 'nodelocal://0/geo_ogr2ogr.sql' WITH ignore_unsupported") sqlDB.Exec(t, `CREATE DATABASE execdb; SET DATABASE = execdb`) geoSQL, err := ioutil.ReadFile(filepath.Join(baseDir, "geo_ogr2ogr.sql")) @@ -5861,7 +5974,7 @@ func TestImportCockroachDump(t *testing.T) { conn := tc.Conns[0] sqlDB := sqlutils.MakeSQLRunner(conn) - sqlDB.Exec(t, "IMPORT PGDUMP ($1)", "nodelocal://0/cockroachdump/dump.sql") + sqlDB.Exec(t, "IMPORT PGDUMP ($1) WITH ignore_unsupported", "nodelocal://0/cockroachdump/dump.sql") sqlDB.CheckQueryResults(t, "SELECT * FROM t ORDER BY i", [][]string{ {"1", "test"}, {"2", "other"}, @@ -5917,7 +6030,7 @@ func TestCreateStatsAfterImport(t *testing.T) { sqlDB.Exec(t, `SET CLUSTER SETTING sql.stats.automatic_collection.enabled=true`) - sqlDB.Exec(t, "IMPORT PGDUMP ($1)", "nodelocal://0/cockroachdump/dump.sql") + sqlDB.Exec(t, "IMPORT PGDUMP ($1) WITH ignore_unsupported", "nodelocal://0/cockroachdump/dump.sql") // Verify that statistics have been created. sqlDB.CheckQueryResultsRetry(t, diff --git a/pkg/ccl/importccl/read_import_pgdump.go b/pkg/ccl/importccl/read_import_pgdump.go index 96cbc210d1b4..07af37520c87 100644 --- a/pkg/ccl/importccl/read_import_pgdump.go +++ b/pkg/ccl/importccl/read_import_pgdump.go @@ -11,6 +11,7 @@ package importccl import ( "bufio" "context" + "fmt" "io" "regexp" "strings" @@ -42,16 +43,20 @@ import ( ) type postgreStream struct { - s *bufio.Scanner - copy *postgreStreamCopy + ctx context.Context + s *bufio.Scanner + copy *postgreStreamCopy + unsupportedStmtLogger *unsupportedStmtLogger } // newPostgreStream returns a struct that can stream statements from an // io.Reader. -func newPostgreStream(r io.Reader, max int) *postgreStream { +func newPostgreStream( + ctx context.Context, r io.Reader, max int, unsupportedStmtLogger *unsupportedStmtLogger, +) *postgreStream { s := bufio.NewScanner(r) s.Buffer(nil, max) - p := &postgreStream{s: s} + p := &postgreStream{ctx: ctx, s: s, unsupportedStmtLogger: unsupportedStmtLogger} s.Split(p.split) return p } @@ -95,14 +100,19 @@ func (p *postgreStream) Next() (interface{}, error) { for p.s.Scan() { t := p.s.Text() - // Regardless if we can parse the statement, check that it's not something - // we want to ignore. - if isIgnoredStatement(t) { - continue - } + skipOverComments(t) stmts, err := parser.Parse(t) if err != nil { + // There are some statements that CRDB is unable to parse. If the user has + // indicated that they want to skip these stmts during the IMPORT, then do + // so here. + if p.unsupportedStmtLogger.ignoreUnsupported && errors.HasType(err, (*tree.UnsupportedError)(nil)) { + if unsupportedErr := (*tree.UnsupportedError)(nil); errors.As(err, &unsupportedErr) { + p.unsupportedStmtLogger.log(unsupportedErr.FeatureName, true /* isParseError */) + } + continue + } return nil, err } switch len(stmts) { @@ -145,20 +155,10 @@ func (p *postgreStream) Next() (interface{}, error) { } var ( - ignoreComments = regexp.MustCompile(`^\s*(--.*)`) - ignoreStatements = []*regexp.Regexp{ - regexp.MustCompile("(?i)^alter function"), - regexp.MustCompile("(?i)^alter sequence .* owned by"), - regexp.MustCompile("(?i)^comment on"), - regexp.MustCompile("(?i)^create extension"), - regexp.MustCompile("(?i)^create function"), - regexp.MustCompile("(?i)^create trigger"), - regexp.MustCompile("(?i)^grant .* on sequence"), - regexp.MustCompile("(?i)^revoke .* on sequence"), - } + ignoreComments = regexp.MustCompile(`^\s*(--.*)`) ) -func isIgnoredStatement(s string) bool { +func skipOverComments(s string) { // Look for the first line with no whitespace or comments. for { m := ignoreComments.FindStringIndex(s) @@ -167,13 +167,6 @@ func isIgnoredStatement(s string) bool { } s = s[m[1]:] } - s = strings.TrimSpace(s) - for _, re := range ignoreStatements { - if re.MatchString(s) { - return true - } - } - return false } type regclassRewriter struct{} @@ -229,6 +222,7 @@ func readPostgresCreateTable( fks fkHandler, max int, owner security.SQLUsername, + unsupportedStmtLogger *unsupportedStmtLogger, ) ([]*tabledesc.Mutable, error) { // Modify the CreateTable stmt with the various index additions. We do this // instead of creating a full table descriptor first and adding indexes @@ -239,7 +233,7 @@ func readPostgresCreateTable( createTbl := make(map[string]*tree.CreateTable) createSeq := make(map[string]*tree.CreateSequence) tableFKs := make(map[string][]*tree.ForeignKeyConstraintTableDef) - ps := newPostgreStream(input, max) + ps := newPostgreStream(ctx, input, max, unsupportedStmtLogger) for { stmt, err := ps.Next() if err == io.EOF { @@ -310,7 +304,8 @@ func readPostgresCreateTable( if err != nil { return nil, errors.Wrap(err, "postgres parse error") } - if err := readPostgresStmt(ctx, evalCtx, match, fks, createTbl, createSeq, tableFKs, stmt, p, parentID); err != nil { + if err := readPostgresStmt(ctx, evalCtx, match, fks, createTbl, createSeq, tableFKs, stmt, p, + parentID, unsupportedStmtLogger); err != nil { return nil, err } } @@ -327,7 +322,9 @@ func readPostgresStmt( stmt interface{}, p sql.JobExecContext, parentID descpb.ID, + unsupportedStmtLogger *unsupportedStmtLogger, ) error { + ignoreUnsupportedStmts := unsupportedStmtLogger.ignoreUnsupported switch stmt := stmt.(type) { case *tree.CreateTable: name, err := getTableName(&stmt.Table) @@ -402,6 +399,10 @@ func readPostgresStmt( } case *tree.AlterTableAddColumn: if cmd.IfNotExists { + if ignoreUnsupportedStmts { + unsupportedStmtLogger.log(stmt.String(), false /* isParseError */) + continue + } return errors.Errorf("unsupported statement: %s", stmt) } create.Defs = append(create.Defs, cmd.ColumnDef) @@ -422,14 +423,20 @@ func readPostgresStmt( if !found { return colinfo.NewUndefinedColumnError(cmd.Column.String()) } - case *tree.AlterTableValidateConstraint: - // ignore default: + if ignoreUnsupportedStmts { + unsupportedStmtLogger.log(stmt.String(), false /* isParseError */) + continue + } return errors.Errorf("unsupported statement: %s", stmt) } } case *tree.AlterTableOwner: - // ignore + if ignoreUnsupportedStmts { + unsupportedStmtLogger.log(stmt.String(), false /* isParseError */) + return nil + } + return errors.Errorf("unsupported statement: %s", stmt) case *tree.CreateSequence: name, err := getTableName(&stmt.Name) if err != nil { @@ -438,6 +445,12 @@ func readPostgresStmt( if match == "" || match == name { createSeq[name] = stmt } + case *tree.AlterSequence: + if ignoreUnsupportedStmts { + unsupportedStmtLogger.log(stmt.String(), false /* isParseError */) + return nil + } + return errors.Errorf("unsupported %T statement: %s", stmt, stmt) // Some SELECT statements mutate schema. Search for those here. case *tree.Select: switch sel := stmt.Select.(type) { @@ -461,12 +474,13 @@ func readPostgresStmt( // Search for a SQLFn, which returns a SQL string to execute. fn := ov.SQLFn if fn == nil { - switch f := expr.Func.String(); f { - case "set_config", "setval": + err := errors.Errorf("unsupported function call: %s in stmt: %s", + expr.Func.String(), stmt.String()) + if ignoreUnsupportedStmts { + unsupportedStmtLogger.log(err.Error(), false /* isParseError */) continue - default: - return errors.Errorf("unsupported function call: %s", expr.Func.String()) } + return err } // Attempt to convert all func exprs to datums. datums := make(tree.Datums, len(expr.Exprs)) @@ -493,7 +507,8 @@ func readPostgresStmt( for _, fnStmt := range fnStmts { switch ast := fnStmt.AST.(type) { case *tree.AlterTable: - if err := readPostgresStmt(ctx, evalCtx, match, fks, createTbl, createSeq, tableFKs, ast, p, parentID); err != nil { + if err := readPostgresStmt(ctx, evalCtx, match, fks, createTbl, createSeq, + tableFKs, ast, p, parentID, unsupportedStmtLogger); err != nil { return err } default: @@ -502,11 +517,21 @@ func readPostgresStmt( } } default: - return errors.Errorf("unsupported %T SELECT expr: %s", expr, expr) + err := errors.Errorf("unsupported %T SELECT expr: %s", expr, expr) + if ignoreUnsupportedStmts { + unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + continue + } + return err } } default: - return errors.Errorf("unsupported %T SELECT: %s", sel, sel) + err := errors.Errorf("unsupported %T SELECT %s", sel, sel) + if ignoreUnsupportedStmts { + unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + return nil + } + return err } case *tree.DropTable: names := stmt.Names @@ -535,17 +560,33 @@ func readPostgresStmt( } } case *tree.BeginTransaction, *tree.CommitTransaction: - // ignore txns. - case *tree.SetVar, *tree.Insert, *tree.CopyFrom, copyData, *tree.Delete: - // ignore SETs and DMLs. - case *tree.Analyze: - // ANALYZE is syntatictic sugar for CreateStatistics. It can be ignored because - // the auto stats stuff will pick up the changes and run if needed. + // Ignore transaction statements as they have no meaning during an IMPORT. + // TODO(during review): Should we guard these statements under the + // ignore_unsupported flag as well? + case *tree.Insert, *tree.CopyFrom, *tree.Delete, copyData: + // handled during the data ingestion pass. + case *tree.CreateExtension, *tree.CommentOnDatabase, *tree.CommentOnTable, + *tree.CommentOnIndex, *tree.CommentOnColumn, *tree.SetVar, *tree.Analyze: + // These are the statements that can be parsed by CRDB but are not + // supported, or are not required to be processed, during an IMPORT. + // - ignore txns. + // - ignore SETs and DMLs. + // - ANALYZE is syntactic sugar for CreateStatistics. It can be ignored + // because the auto stats stuff will pick up the changes and run if needed. + if ignoreUnsupportedStmts { + unsupportedStmtLogger.log(fmt.Sprintf("%s", stmt), false /* isParseError */) + return nil + } + return errors.Errorf("unsupported %T statement: %s", stmt, stmt) case error: if !errors.Is(stmt, errCopyDone) { return stmt } default: + if ignoreUnsupportedStmts { + unsupportedStmtLogger.log(fmt.Sprintf("%s", stmt), false /* isParseError */) + return nil + } return errors.Errorf("unsupported %T statement: %s", stmt, stmt) } return nil @@ -575,14 +616,15 @@ func getTableName2(u *tree.UnresolvedObjectName) (string, error) { } type pgDumpReader struct { - tableDescs map[string]catalog.TableDescriptor - tables map[string]*row.DatumRowConverter - descs map[string]*execinfrapb.ReadImportDataSpec_ImportTable - kvCh chan row.KVBatch - opts roachpb.PgDumpOptions - walltime int64 - colMap map[*row.DatumRowConverter](map[string]int) - evalCtx *tree.EvalContext + tableDescs map[string]catalog.TableDescriptor + tables map[string]*row.DatumRowConverter + descs map[string]*execinfrapb.ReadImportDataSpec_ImportTable + kvCh chan row.KVBatch + opts roachpb.PgDumpOptions + walltime int64 + colMap map[*row.DatumRowConverter](map[string]int) + unsupportedStmtLogger *unsupportedStmtLogger + evalCtx *tree.EvalContext } var _ inputConverter = &pgDumpReader{} @@ -646,7 +688,16 @@ func (m *pgDumpReader) readFiles( makeExternalStorage cloud.ExternalStorageFactory, user security.SQLUsername, ) error { - return readInputFiles(ctx, dataFiles, resumePos, format, m.readFile, makeExternalStorage, user) + // Setup logger to handle unsupported DML statements seen in the PGDUMP file. + m.unsupportedStmtLogger = makeUnsupportedStmtLogger(format.PgDump.IgnoreUnsupported, + format.PgDump.IgnoreUnsupportedLog, dataIngestion, makeExternalStorage) + + err := readInputFiles(ctx, dataFiles, resumePos, format, m.readFile, makeExternalStorage, user) + if err != nil { + return err + } + + return m.unsupportedStmtLogger.flush(ctx, user) } func (m *pgDumpReader) readFile( @@ -655,7 +706,7 @@ func (m *pgDumpReader) readFile( tableNameToRowsProcessed := make(map[string]int64) var inserts, count int64 rowLimit := m.opts.RowLimit - ps := newPostgreStream(input, int(m.opts.MaxRowSize)) + ps := newPostgreStream(ctx, input, int(m.opts.MaxRowSize), m.unsupportedStmtLogger) semaCtx := tree.MakeSemaContext() for _, conv := range m.tables { conv.KvBatch.Source = inputIdx @@ -700,6 +751,12 @@ func (m *pgDumpReader) readFile( timestamp := timestampAfterEpoch(m.walltime) values, ok := i.Rows.Select.(*tree.ValuesClause) if !ok { + if m.unsupportedStmtLogger.ignoreUnsupported { + logLine := fmt.Sprintf("%s: unsupported by IMPORT\n", + i.Rows.Select.String()) + m.unsupportedStmtLogger.log(logLine, false /* isParseError */) + continue + } return errors.Errorf("unsupported: %s", i.Rows.Select) } inserts++ @@ -846,22 +903,47 @@ func (m *pgDumpReader) readFile( // by pg_dump, and thus if it isn't, we don't try to figure out what to do. sc, ok := i.Select.(*tree.SelectClause) if !ok { - return errors.Errorf("unsupported %T Select: %v", i.Select, i.Select) + err := errors.Errorf("unsupported %T Select: %v", i.Select, i.Select) + if m.unsupportedStmtLogger.ignoreUnsupported { + m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + continue + } + return err } if len(sc.Exprs) != 1 { - return errors.Errorf("unsupported %d select args: %v", len(sc.Exprs), sc.Exprs) + err := errors.Errorf("unsupported %d select args: %v", len(sc.Exprs), sc.Exprs) + if m.unsupportedStmtLogger.ignoreUnsupported { + m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + continue + } + return err } fn, ok := sc.Exprs[0].Expr.(*tree.FuncExpr) if !ok { - return errors.Errorf("unsupported select arg %T: %v", sc.Exprs[0].Expr, sc.Exprs[0].Expr) + err := errors.Errorf("unsupported select arg %T: %v", sc.Exprs[0].Expr, sc.Exprs[0].Expr) + if m.unsupportedStmtLogger.ignoreUnsupported { + m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + continue + } + return err } switch funcName := strings.ToLower(fn.Func.String()); funcName { case "search_path", "pg_catalog.set_config": - continue + err := errors.Errorf("unsupported %d fn args in select: %v", len(fn.Exprs), fn.Exprs) + if m.unsupportedStmtLogger.ignoreUnsupported { + m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + continue + } + return err case "setval", "pg_catalog.setval": if args := len(fn.Exprs); args < 2 || args > 3 { - return errors.Errorf("unsupported %d fn args: %v", len(fn.Exprs), fn.Exprs) + err := errors.Errorf("unsupported %d fn args in select: %v", len(fn.Exprs), fn.Exprs) + if m.unsupportedStmtLogger.ignoreUnsupported { + m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + continue + } + return err } seqname, ok := fn.Exprs[0].(*tree.StrVal) if !ok { @@ -877,7 +959,12 @@ func (m *pgDumpReader) readFile( } seqval, ok := fn.Exprs[1].(*tree.NumVal) if !ok { - return errors.Errorf("unsupported setval %T arg: %v", fn.Exprs[1], fn.Exprs[1]) + err := errors.Errorf("unsupported setval %T arg: %v", fn.Exprs[1], fn.Exprs[1]) + if m.unsupportedStmtLogger.ignoreUnsupported { + m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + continue + } + return err } val, err := seqval.AsInt64() if err != nil { @@ -887,7 +974,12 @@ func (m *pgDumpReader) readFile( if len(fn.Exprs) == 3 { called, ok := fn.Exprs[2].(*tree.DBool) if !ok { - return errors.Errorf("unsupported setval %T arg: %v", fn.Exprs[2], fn.Exprs[2]) + err := errors.Errorf("unsupported setval %T arg: %v", fn.Exprs[2], fn.Exprs[2]) + if m.unsupportedStmtLogger.ignoreUnsupported { + m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + continue + } + return err } isCalled = bool(*called) } @@ -911,25 +1003,28 @@ func (m *pgDumpReader) readFile( case "addgeometrycolumn": // handled during schema extraction. default: - return errors.Errorf("unsupported function: %s", funcName) + err := errors.Errorf("unsupported function %s in stmt %s", funcName, i.Select.String()) + if m.unsupportedStmtLogger.ignoreUnsupported { + m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + continue + } + return err } + case *tree.CreateExtension, *tree.CommentOnDatabase, *tree.CommentOnTable, + *tree.CommentOnIndex, *tree.CommentOnColumn, *tree.AlterSequence: + // handled during schema extraction. case *tree.SetVar, *tree.BeginTransaction, *tree.CommitTransaction, *tree.Analyze: - // ignored. - case *tree.CreateTable, *tree.AlterTable, *tree.AlterTableOwner, *tree.CreateIndex, *tree.CreateSequence, *tree.DropTable: // handled during schema extraction. - case *tree.Delete: - switch stmt := i.Table.(type) { - case *tree.AliasedTableExpr: - // ogr2ogr has `DELETE FROM geometry_columns / geography_columns ...` statements. - // We're not planning to support this functionality in CRDB, so it is safe to ignore it when countered in PGDUMP. - if tn, ok := stmt.Expr.(*tree.TableName); !(ok && (tn.Table() == "geometry_columns" || tn.Table() == "geography_columns")) { - return errors.Errorf("unsupported DELETE FROM %T statement: %s", stmt, stmt) - } - default: - return errors.Errorf("unsupported %T statement: %s", i, i) - } + case *tree.CreateTable, *tree.AlterTable, *tree.AlterTableOwner, *tree.CreateIndex, + *tree.CreateSequence, *tree.DropTable: + // handled during schema extraction. default: - return errors.Errorf("unsupported %T statement: %v", i, i) + err := errors.Errorf("unsupported %T statement: %v", i, i) + if m.unsupportedStmtLogger.ignoreUnsupported { + m.unsupportedStmtLogger.log(err.Error(), false /* isParseError */) + continue + } + return err } } for _, conv := range m.tables { diff --git a/pkg/ccl/importccl/read_import_pgdump_test.go b/pkg/ccl/importccl/read_import_pgdump_test.go index b883810426ae..0e42cd9b23bf 100644 --- a/pkg/ccl/importccl/read_import_pgdump_test.go +++ b/pkg/ccl/importccl/read_import_pgdump_test.go @@ -39,7 +39,7 @@ select '123456789012345678901234567890123456789012345678901234567890123456789012 -- ` - p := newPostgreStream(strings.NewReader(sql), defaultScanBuffer) + p := newPostgreStream(context.Background(), strings.NewReader(sql), defaultScanBuffer, nil /* unsupportedStmtLogger */) var sb strings.Builder for { s, err := p.Next() @@ -121,7 +121,7 @@ COPY public.t (s) FROM stdin; -- ` - p := newPostgreStream(strings.NewReader(sql), defaultScanBuffer) + p := newPostgreStream(context.Background(), strings.NewReader(sql), defaultScanBuffer, nil /* unsupportedStmtLogger */) var sb strings.Builder for { s, err := p.Next() diff --git a/pkg/ccl/importccl/testdata/pgdump/simple.sql b/pkg/ccl/importccl/testdata/pgdump/simple.sql index e117fea0017e..1d49cfa0336a 100644 --- a/pkg/ccl/importccl/testdata/pgdump/simple.sql +++ b/pkg/ccl/importccl/testdata/pgdump/simple.sql @@ -19,6 +19,20 @@ SET default_tablespace = ''; SET default_with_oids = false; + +-- +-- Name: simple; Type: EXTENSION; Schema: -; Owner: +-- + +CREATE EXTENSION IF NOT EXISTS simple WITH SCHEMA pg_catalog; + + +-- +-- Name: EXTENSION plpgsql; Type: COMMENT; Schema: -; Owner: +-- + +COMMENT ON EXTENSION simple IS 'simple extension'; + -- -- Name: simple; Type: TABLE; Schema: public; Owner: postgres -- diff --git a/pkg/ccl/logictestccl/testdata/logic_test/alter_table_locality b/pkg/ccl/logictestccl/testdata/logic_test/alter_table_locality index b00d11f73981..3bb187aa3de0 100644 --- a/pkg/ccl/logictestccl/testdata/logic_test/alter_table_locality +++ b/pkg/ccl/logictestccl/testdata/logic_test/alter_table_locality @@ -1465,15 +1465,15 @@ regional_by_row CREATE TABLE public.regional_by_ query TT SHOW ZONE CONFIGURATION FOR TABLE regional_by_row ---- -TABLE regional_by_row ALTER TABLE regional_by_row CONFIGURE ZONE USING - range_min_bytes = 134217728, - range_max_bytes = 536870912, - gc.ttlseconds = 90000, - num_replicas = 5, - num_voters = 3, - constraints = '{+region=ap-southeast-2: 1, +region=ca-central-1: 1, +region=us-east-1: 1}', - voter_constraints = '[+region=ca-central-1]', - lease_preferences = '[[+region=ca-central-1]]' +DATABASE alter_locality_test ALTER DATABASE alter_locality_test CONFIGURE ZONE USING + range_min_bytes = 134217728, + range_max_bytes = 536870912, + gc.ttlseconds = 90000, + num_replicas = 5, + num_voters = 3, + constraints = '{+region=ap-southeast-2: 1, +region=ca-central-1: 1, +region=us-east-1: 1}', + voter_constraints = '[+region=ca-central-1]', + lease_preferences = '[[+region=ca-central-1]]' statement ok DROP TABLE regional_by_row; @@ -1537,15 +1537,15 @@ regional_by_row CREATE TABLE public.regional_by_ query TT SHOW ZONE CONFIGURATION FOR TABLE regional_by_row ---- -TABLE regional_by_row ALTER TABLE regional_by_row CONFIGURE ZONE USING - range_min_bytes = 134217728, - range_max_bytes = 536870912, - gc.ttlseconds = 90000, - num_replicas = 5, - num_voters = 3, - constraints = '{+region=ap-southeast-2: 1, +region=ca-central-1: 1, +region=us-east-1: 1}', - voter_constraints = '[+region=ca-central-1]', - lease_preferences = '[[+region=ca-central-1]]' +DATABASE alter_locality_test ALTER DATABASE alter_locality_test CONFIGURE ZONE USING + range_min_bytes = 134217728, + range_max_bytes = 536870912, + gc.ttlseconds = 90000, + num_replicas = 5, + num_voters = 3, + constraints = '{+region=ap-southeast-2: 1, +region=ca-central-1: 1, +region=us-east-1: 1}', + voter_constraints = '[+region=ca-central-1]', + lease_preferences = '[[+region=ca-central-1]]' statement ok DROP TABLE regional_by_row; @@ -1831,15 +1831,15 @@ regional_by_row_as CREATE TABLE public.regional_by_ query TT SHOW ZONE CONFIGURATION FOR TABLE regional_by_row_as ---- -TABLE regional_by_row_as ALTER TABLE regional_by_row_as CONFIGURE ZONE USING - range_min_bytes = 134217728, - range_max_bytes = 536870912, - gc.ttlseconds = 90000, - num_replicas = 5, - num_voters = 3, - constraints = '{+region=ap-southeast-2: 1, +region=ca-central-1: 1, +region=us-east-1: 1}', - voter_constraints = '[+region=ca-central-1]', - lease_preferences = '[[+region=ca-central-1]]' +DATABASE alter_locality_test ALTER DATABASE alter_locality_test CONFIGURE ZONE USING + range_min_bytes = 134217728, + range_max_bytes = 536870912, + gc.ttlseconds = 90000, + num_replicas = 5, + num_voters = 3, + constraints = '{+region=ap-southeast-2: 1, +region=ca-central-1: 1, +region=us-east-1: 1}', + voter_constraints = '[+region=ca-central-1]', + lease_preferences = '[[+region=ca-central-1]]' statement ok DROP TABLE regional_by_row_as; @@ -1905,15 +1905,15 @@ regional_by_row_as CREATE TABLE public.regional_by_ query TT SHOW ZONE CONFIGURATION FOR TABLE regional_by_row_as ---- -TABLE regional_by_row_as ALTER TABLE regional_by_row_as CONFIGURE ZONE USING - range_min_bytes = 134217728, - range_max_bytes = 536870912, - gc.ttlseconds = 90000, - num_replicas = 5, - num_voters = 3, - constraints = '{+region=ap-southeast-2: 1, +region=ca-central-1: 1, +region=us-east-1: 1}', - voter_constraints = '[+region=ca-central-1]', - lease_preferences = '[[+region=ca-central-1]]' +DATABASE alter_locality_test ALTER DATABASE alter_locality_test CONFIGURE ZONE USING + range_min_bytes = 134217728, + range_max_bytes = 536870912, + gc.ttlseconds = 90000, + num_replicas = 5, + num_voters = 3, + constraints = '{+region=ap-southeast-2: 1, +region=ca-central-1: 1, +region=us-east-1: 1}', + voter_constraints = '[+region=ca-central-1]', + lease_preferences = '[[+region=ca-central-1]]' statement ok DROP TABLE regional_by_row_as; diff --git a/pkg/kv/kvserver/batcheval/cmd_revert_range.go b/pkg/kv/kvserver/batcheval/cmd_revert_range.go index afc1cae3fec2..e93fc8a19880 100644 --- a/pkg/kv/kvserver/batcheval/cmd_revert_range.go +++ b/pkg/kv/kvserver/batcheval/cmd_revert_range.go @@ -57,6 +57,8 @@ func isEmptyKeyTimeRange( return !ok, err } +const maxRevertRangeBatchBytes = 32 << 20 + // RevertRange wipes all MVCC versions more recent than TargetTime (up to the // command timestamp) of the keys covered by the specified span, adjusting the // MVCC stats accordingly. @@ -88,6 +90,7 @@ func RevertRange( resume, err := storage.MVCCClearTimeRange(ctx, readWriter, cArgs.Stats, args.Key, args.EndKey, args.TargetTime, cArgs.Header.Timestamp, cArgs.Header.MaxSpanRequestKeys, + maxRevertRangeBatchBytes, args.EnableTimeBoundIteratorOptimization) if err != nil { return result.Result{}, err diff --git a/pkg/roachpb/io-formats.pb.go b/pkg/roachpb/io-formats.pb.go index 45b572a6e788..a8a9295af5da 100644 --- a/pkg/roachpb/io-formats.pb.go +++ b/pkg/roachpb/io-formats.pb.go @@ -68,7 +68,7 @@ func (x *IOFileFormat_FileFormat) UnmarshalJSON(data []byte) error { return nil } func (IOFileFormat_FileFormat) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_io_formats_cf12087409a41794, []int{0, 0} + return fileDescriptor_io_formats_b77b03f938e9e2ae, []int{0, 0} } type IOFileFormat_Compression int32 @@ -110,7 +110,7 @@ func (x *IOFileFormat_Compression) UnmarshalJSON(data []byte) error { return nil } func (IOFileFormat_Compression) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_io_formats_cf12087409a41794, []int{0, 1} + return fileDescriptor_io_formats_b77b03f938e9e2ae, []int{0, 1} } type MySQLOutfileOptions_Enclose int32 @@ -149,7 +149,7 @@ func (x *MySQLOutfileOptions_Enclose) UnmarshalJSON(data []byte) error { return nil } func (MySQLOutfileOptions_Enclose) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_io_formats_cf12087409a41794, []int{2, 0} + return fileDescriptor_io_formats_b77b03f938e9e2ae, []int{2, 0} } type AvroOptions_Format int32 @@ -191,7 +191,7 @@ func (x *AvroOptions_Format) UnmarshalJSON(data []byte) error { return nil } func (AvroOptions_Format) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_io_formats_cf12087409a41794, []int{6, 0} + return fileDescriptor_io_formats_b77b03f938e9e2ae, []int{6, 0} } type IOFileFormat struct { @@ -211,7 +211,7 @@ func (m *IOFileFormat) Reset() { *m = IOFileFormat{} } func (m *IOFileFormat) String() string { return proto.CompactTextString(m) } func (*IOFileFormat) ProtoMessage() {} func (*IOFileFormat) Descriptor() ([]byte, []int) { - return fileDescriptor_io_formats_cf12087409a41794, []int{0} + return fileDescriptor_io_formats_b77b03f938e9e2ae, []int{0} } func (m *IOFileFormat) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -258,7 +258,7 @@ func (m *CSVOptions) Reset() { *m = CSVOptions{} } func (m *CSVOptions) String() string { return proto.CompactTextString(m) } func (*CSVOptions) ProtoMessage() {} func (*CSVOptions) Descriptor() ([]byte, []int) { - return fileDescriptor_io_formats_cf12087409a41794, []int{1} + return fileDescriptor_io_formats_b77b03f938e9e2ae, []int{1} } func (m *CSVOptions) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -309,7 +309,7 @@ func (m *MySQLOutfileOptions) Reset() { *m = MySQLOutfileOptions{} } func (m *MySQLOutfileOptions) String() string { return proto.CompactTextString(m) } func (*MySQLOutfileOptions) ProtoMessage() {} func (*MySQLOutfileOptions) Descriptor() ([]byte, []int) { - return fileDescriptor_io_formats_cf12087409a41794, []int{2} + return fileDescriptor_io_formats_b77b03f938e9e2ae, []int{2} } func (m *MySQLOutfileOptions) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -348,7 +348,7 @@ func (m *PgCopyOptions) Reset() { *m = PgCopyOptions{} } func (m *PgCopyOptions) String() string { return proto.CompactTextString(m) } func (*PgCopyOptions) ProtoMessage() {} func (*PgCopyOptions) Descriptor() ([]byte, []int) { - return fileDescriptor_io_formats_cf12087409a41794, []int{3} + return fileDescriptor_io_formats_b77b03f938e9e2ae, []int{3} } func (m *PgCopyOptions) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -380,13 +380,21 @@ type PgDumpOptions struct { // Indicates the number of rows to import per table. // Must be a non-zero positive number. RowLimit int64 `protobuf:"varint,2,opt,name=row_limit,json=rowLimit" json:"row_limit"` + // Indicates if all unparseable and parseable, but unimplemented PGDUMP stmts + // should be ignored during IMPORT. + IgnoreUnsupported bool `protobuf:"varint,3,opt,name=ignore_unsupported,json=ignoreUnsupported" json:"ignore_unsupported"` + // Points to the destination where unsupported statements during a PGDUMP + // import should be logged. This can only be used when ignore_unsupported is + // specified, otherwise the IMPORT errors out on encountering an unsupported + // stmt. + IgnoreUnsupportedLog string `protobuf:"bytes,4,opt,name=ignore_unsupported_log,json=ignoreUnsupportedLog" json:"ignore_unsupported_log"` } func (m *PgDumpOptions) Reset() { *m = PgDumpOptions{} } func (m *PgDumpOptions) String() string { return proto.CompactTextString(m) } func (*PgDumpOptions) ProtoMessage() {} func (*PgDumpOptions) Descriptor() ([]byte, []int) { - return fileDescriptor_io_formats_cf12087409a41794, []int{4} + return fileDescriptor_io_formats_b77b03f938e9e2ae, []int{4} } func (m *PgDumpOptions) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -421,7 +429,7 @@ func (m *MysqldumpOptions) Reset() { *m = MysqldumpOptions{} } func (m *MysqldumpOptions) String() string { return proto.CompactTextString(m) } func (*MysqldumpOptions) ProtoMessage() {} func (*MysqldumpOptions) Descriptor() ([]byte, []int) { - return fileDescriptor_io_formats_cf12087409a41794, []int{5} + return fileDescriptor_io_formats_b77b03f938e9e2ae, []int{5} } func (m *MysqldumpOptions) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -464,7 +472,7 @@ func (m *AvroOptions) Reset() { *m = AvroOptions{} } func (m *AvroOptions) String() string { return proto.CompactTextString(m) } func (*AvroOptions) ProtoMessage() {} func (*AvroOptions) Descriptor() ([]byte, []int) { - return fileDescriptor_io_formats_cf12087409a41794, []int{6} + return fileDescriptor_io_formats_b77b03f938e9e2ae, []int{6} } func (m *AvroOptions) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -728,6 +736,18 @@ func (m *PgDumpOptions) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x10 i++ i = encodeVarintIoFormats(dAtA, i, uint64(m.RowLimit)) + dAtA[i] = 0x18 + i++ + if m.IgnoreUnsupported { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i++ + dAtA[i] = 0x22 + i++ + i = encodeVarintIoFormats(dAtA, i, uint64(len(m.IgnoreUnsupportedLog))) + i += copy(dAtA[i:], m.IgnoreUnsupportedLog) return i, nil } @@ -887,6 +907,9 @@ func (m *PgDumpOptions) Size() (n int) { _ = l n += 1 + sovIoFormats(uint64(m.MaxRowSize)) n += 1 + sovIoFormats(uint64(m.RowLimit)) + n += 2 + l = len(m.IgnoreUnsupportedLog) + n += 1 + l + sovIoFormats(uint64(l)) return n } @@ -1810,6 +1833,55 @@ func (m *PgDumpOptions) Unmarshal(dAtA []byte) error { break } } + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field IgnoreUnsupported", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowIoFormats + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.IgnoreUnsupported = bool(v != 0) + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field IgnoreUnsupportedLog", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowIoFormats + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthIoFormats + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.IgnoreUnsupportedLog = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipIoFormats(dAtA[iNdEx:]) @@ -2181,68 +2253,71 @@ var ( ) func init() { - proto.RegisterFile("roachpb/io-formats.proto", fileDescriptor_io_formats_cf12087409a41794) -} - -var fileDescriptor_io_formats_cf12087409a41794 = []byte{ - // 938 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x55, 0x5f, 0x6f, 0xe3, 0x44, - 0x10, 0xb7, 0xf3, 0xcf, 0xf1, 0x24, 0x69, 0x97, 0x85, 0x07, 0xeb, 0x04, 0x26, 0xe4, 0x38, 0xd4, - 0x03, 0xce, 0x95, 0x2a, 0x2a, 0xf1, 0x86, 0xae, 0xb9, 0x96, 0xeb, 0xe9, 0x9a, 0x70, 0x89, 0x38, - 0x21, 0x5e, 0x2c, 0x63, 0x6f, 0x53, 0x53, 0xdb, 0xeb, 0x7a, 0x9d, 0xe4, 0x72, 0x9f, 0x82, 0xcf, - 0xc4, 0x53, 0x1f, 0xef, 0x8d, 0x93, 0x90, 0x10, 0xb4, 0xdf, 0x03, 0xa1, 0x5d, 0xaf, 0x13, 0xbb, - 0x35, 0x77, 0x7d, 0x1b, 0xcd, 0x6f, 0x7e, 0xb3, 0x33, 0xf3, 0x9b, 0xdd, 0x05, 0x23, 0xa1, 0x8e, - 0x7b, 0x16, 0xff, 0xb2, 0xeb, 0xd3, 0x47, 0xa7, 0x34, 0x09, 0x9d, 0x94, 0x59, 0x71, 0x42, 0x53, - 0x8a, 0x3f, 0x70, 0xa9, 0x7b, 0x2e, 0x50, 0x4b, 0xc6, 0xdc, 0xfb, 0x68, 0x46, 0x67, 0x54, 0xa0, - 0xbb, 0xdc, 0xca, 0x02, 0x07, 0xff, 0x36, 0xa1, 0x7b, 0x3c, 0x3e, 0xf2, 0x03, 0x72, 0x24, 0x12, - 0xe0, 0xa7, 0xd0, 0xca, 0x52, 0x19, 0x6a, 0x5f, 0xdd, 0xd9, 0xda, 0xfb, 0xd2, 0xba, 0x95, 0xca, - 0x2a, 0x12, 0xac, 0x8d, 0x79, 0xd0, 0xb8, 0xfc, 0xeb, 0x53, 0x65, 0x22, 0xf9, 0x78, 0x1f, 0xea, - 0x2e, 0x5b, 0x18, 0xb5, 0xbe, 0xba, 0xd3, 0xd9, 0xfb, 0xa4, 0x22, 0xcd, 0x70, 0xfa, 0x72, 0x1c, - 0xa7, 0x3e, 0x8d, 0x98, 0x64, 0xf2, 0x78, 0x7c, 0x0c, 0x7a, 0xb8, 0x62, 0x17, 0x81, 0x4d, 0xe7, - 0xa9, 0x51, 0x17, 0xe4, 0x2f, 0x2a, 0xc8, 0x27, 0xab, 0xe9, 0x8b, 0xe7, 0xe3, 0x79, 0x7a, 0xea, - 0x07, 0xa4, 0x9c, 0xa5, 0x2d, 0xe8, 0xe3, 0x79, 0x8a, 0xbf, 0x03, 0x2d, 0x9e, 0xd9, 0x2e, 0x8d, - 0x57, 0x46, 0x43, 0x24, 0xea, 0x57, 0x24, 0xfa, 0x61, 0x36, 0xa4, 0xf1, 0xaa, 0x9c, 0xa2, 0x15, - 0x0b, 0x27, 0x9e, 0x42, 0xc7, 0xa5, 0x61, 0x9c, 0x10, 0xc6, 0x7c, 0x1a, 0x19, 0x4d, 0x31, 0x91, - 0xaf, 0xde, 0x37, 0x91, 0xe1, 0x86, 0x22, 0xf3, 0x15, 0xb3, 0xc8, 0xaa, 0xbc, 0x79, 0x18, 0x1b, - 0xad, 0x77, 0x54, 0xf5, 0x64, 0x1e, 0xc6, 0xb7, 0xaa, 0xe2, 0x4e, 0xfc, 0x10, 0x7a, 0xcc, 0x59, - 0x10, 0x3b, 0x21, 0xbf, 0x12, 0x37, 0x25, 0x9e, 0xa1, 0xf5, 0xd5, 0x9d, 0xb6, 0x0c, 0xea, 0x72, - 0x68, 0x22, 0x11, 0xfc, 0x2d, 0x34, 0x9c, 0x45, 0x42, 0x8d, 0xb6, 0x38, 0xc8, 0xac, 0x38, 0xe8, - 0xf1, 0x22, 0xa1, 0xe5, 0x63, 0x04, 0x03, 0x3f, 0x05, 0xc8, 0x64, 0x10, 0x85, 0xea, 0x82, 0x7f, - 0xbf, 0x52, 0x07, 0x76, 0x11, 0x78, 0xb7, 0x6a, 0xcd, 0x34, 0xe4, 0xe5, 0x0e, 0x08, 0x40, 0x61, - 0xbf, 0x3a, 0xa0, 0xfd, 0x18, 0x9d, 0x47, 0x74, 0x19, 0x21, 0x05, 0x6b, 0x50, 0x1f, 0x4e, 0x5f, - 0x22, 0x15, 0x23, 0xe8, 0x9e, 0x48, 0xd5, 0xb8, 0xa0, 0xa8, 0x86, 0x7b, 0xa0, 0xaf, 0x53, 0xa3, - 0x3a, 0x06, 0x68, 0x65, 0x42, 0xa1, 0x46, 0x66, 0xf3, 0xd4, 0xa8, 0x89, 0xdb, 0xd0, 0xe0, 0x1d, - 0xa0, 0xd6, 0x60, 0x1f, 0x3a, 0x85, 0xc1, 0x0b, 0x60, 0x9e, 0x52, 0xa4, 0x70, 0x6b, 0x44, 0x23, - 0x82, 0x54, 0x6e, 0x7d, 0xff, 0xda, 0x8f, 0x51, 0x8d, 0x5b, 0x07, 0xdc, 0xaa, 0x0f, 0xfe, 0x54, - 0x01, 0x36, 0x8b, 0x88, 0xef, 0x41, 0xd3, 0xa5, 0x61, 0xe8, 0x88, 0xed, 0x6f, 0xca, 0x66, 0x32, - 0x17, 0x36, 0x41, 0xe3, 0x06, 0x89, 0x52, 0xb1, 0xd4, 0x39, 0x9a, 0x3b, 0xb9, 0x2e, 0xd1, 0x3c, - 0x08, 0x6c, 0x12, 0xb9, 0xd4, 0xf3, 0xa3, 0x99, 0xd8, 0x5e, 0x5d, 0x44, 0xa9, 0x93, 0x2e, 0x87, - 0x0e, 0x25, 0x82, 0x0d, 0x68, 0xb0, 0x73, 0x3f, 0x16, 0x6b, 0xd9, 0xcb, 0xe7, 0xce, 0x3d, 0x42, - 0xdc, 0x34, 0xf1, 0xdd, 0xd4, 0xbe, 0x98, 0xd3, 0x94, 0x30, 0xb1, 0x74, 0x1b, 0x71, 0x05, 0xf4, - 0x42, 0x20, 0xf8, 0x33, 0xd0, 0x13, 0xba, 0xb4, 0x03, 0x3f, 0xf4, 0x53, 0xb1, 0x4a, 0xf5, 0xfc, - 0x06, 0x24, 0x74, 0xf9, 0x9c, 0x7b, 0x07, 0xbf, 0xd7, 0xe1, 0xc3, 0x8a, 0x9b, 0xc2, 0x4f, 0xe1, - 0x54, 0x46, 0x62, 0x27, 0x71, 0x52, 0x9a, 0x94, 0xda, 0xed, 0x26, 0x74, 0x39, 0xcd, 0x11, 0xfc, - 0x08, 0xb6, 0x4f, 0x7d, 0x12, 0x78, 0x85, 0xe0, 0x62, 0xf7, 0x5b, 0x02, 0xdc, 0x84, 0x8f, 0x40, - 0x23, 0x91, 0x1b, 0x50, 0x46, 0x44, 0xfb, 0x5b, 0x7b, 0xd6, 0xdd, 0x2e, 0xaf, 0x75, 0x98, 0xb1, - 0xf2, 0xa1, 0xca, 0x24, 0xb8, 0x0f, 0x6d, 0x69, 0x26, 0x62, 0x5a, 0xf9, 0xb9, 0x6b, 0x2f, 0xbe, - 0x0f, 0x70, 0xe6, 0x30, 0x9b, 0x30, 0xd7, 0x89, 0x49, 0x69, 0x5c, 0xfa, 0x99, 0xc3, 0x0e, 0x85, - 0x1b, 0x7f, 0x0c, 0x2d, 0x19, 0xd0, 0x2a, 0x24, 0x91, 0xbe, 0xb5, 0x1c, 0x5a, 0x95, 0x1c, 0x65, - 0x4d, 0xdb, 0xff, 0xab, 0x69, 0x49, 0x0e, 0xa8, 0x94, 0xc3, 0x02, 0x4d, 0xb6, 0x89, 0x75, 0x68, - 0x8e, 0xc8, 0x82, 0x24, 0x48, 0xe1, 0xfb, 0xfc, 0x38, 0x58, 0x3a, 0x2b, 0x86, 0x54, 0xdc, 0x85, - 0x76, 0x36, 0x10, 0x27, 0x40, 0xb5, 0x67, 0x8d, 0xb6, 0x8e, 0x60, 0xc0, 0xa0, 0x57, 0x7a, 0xa4, - 0xf0, 0x00, 0x74, 0x8f, 0x88, 0x73, 0x48, 0x59, 0xb9, 0x8d, 0x9b, 0xb7, 0xc4, 0xab, 0x13, 0x5a, - 0xe9, 0x79, 0x4b, 0xdc, 0x83, 0x3f, 0x07, 0x08, 0x9d, 0x57, 0x13, 0xba, 0x9c, 0xfa, 0xaf, 0x33, - 0x91, 0x72, 0x7a, 0xc1, 0x3f, 0xf8, 0x89, 0x1f, 0x5a, 0x78, 0x83, 0x6e, 0xd0, 0xd4, 0x6a, 0x5a, - 0x79, 0x08, 0xb5, 0xca, 0x21, 0xec, 0x03, 0xba, 0xf9, 0x68, 0x94, 0x69, 0x6a, 0x25, 0xed, 0x8f, - 0x1a, 0x74, 0x0a, 0x8f, 0x15, 0x1e, 0xde, 0xf8, 0xa8, 0x1e, 0xbc, 0xfb, 0x71, 0xb3, 0x2a, 0xff, - 0xa8, 0x07, 0xd0, 0x91, 0xb7, 0x2d, 0xa4, 0x1e, 0x11, 0x05, 0xe7, 0xcb, 0x03, 0x19, 0x70, 0x42, - 0x3d, 0xc2, 0x7b, 0x67, 0xee, 0x19, 0x09, 0x9d, 0x67, 0xd3, 0xf1, 0xa8, 0x70, 0xad, 0x79, 0xd4, - 0xda, 0x8f, 0xbf, 0x86, 0xed, 0xd0, 0x79, 0x65, 0x27, 0xc4, 0xa5, 0x89, 0x67, 0x33, 0x3e, 0xa6, - 0xe2, 0xc6, 0xf6, 0xf8, 0x98, 0x04, 0x26, 0x26, 0xb5, 0x0b, 0x28, 0x8f, 0x5c, 0x5f, 0xac, 0x66, - 0x21, 0x7c, 0x3b, 0x43, 0x37, 0x37, 0xeb, 0x0e, 0xd7, 0xfd, 0x1b, 0x68, 0xc9, 0x67, 0x56, 0x83, - 0xfa, 0x78, 0x78, 0x84, 0x14, 0xbc, 0x0d, 0x9d, 0x83, 0xe3, 0x91, 0x3d, 0x39, 0x1c, 0x8e, 0x27, - 0x4f, 0xa6, 0xd9, 0x53, 0xcb, 0xab, 0x5d, 0x7b, 0x6a, 0x07, 0x0f, 0x2f, 0xff, 0x31, 0x95, 0xcb, - 0x2b, 0x53, 0x7d, 0x73, 0x65, 0xaa, 0x6f, 0xaf, 0x4c, 0xf5, 0xef, 0x2b, 0x53, 0xfd, 0xed, 0xda, - 0x54, 0xde, 0x5c, 0x9b, 0xca, 0xdb, 0x6b, 0x53, 0xf9, 0x59, 0x93, 0x03, 0xfd, 0x2f, 0x00, 0x00, - 0xff, 0xff, 0xf9, 0x03, 0xbb, 0x68, 0x72, 0x08, 0x00, 0x00, + proto.RegisterFile("roachpb/io-formats.proto", fileDescriptor_io_formats_b77b03f938e9e2ae) +} + +var fileDescriptor_io_formats_b77b03f938e9e2ae = []byte{ + // 986 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x55, 0xcf, 0x6e, 0xdb, 0xc6, + 0x13, 0x16, 0xf5, 0x8f, 0xe2, 0x48, 0xb2, 0x99, 0xfd, 0x05, 0x3f, 0x10, 0x41, 0xcb, 0xaa, 0x4a, + 0x53, 0x38, 0x6d, 0x23, 0x03, 0x6e, 0x0d, 0x14, 0xbd, 0x14, 0xb1, 0x62, 0x37, 0x0e, 0x6c, 0xa9, + 0x91, 0x90, 0x1c, 0x7a, 0x21, 0x58, 0x72, 0x2d, 0xb3, 0x26, 0xb9, 0xf4, 0x2e, 0x29, 0x45, 0x79, + 0x8a, 0x3e, 0x53, 0x4f, 0xee, 0x2d, 0xb7, 0x06, 0x28, 0x50, 0xb4, 0xf6, 0x7b, 0x14, 0xc5, 0x2e, + 0x97, 0x12, 0x69, 0xb1, 0x69, 0x6e, 0x83, 0xf9, 0xe6, 0x1b, 0xce, 0xcc, 0x37, 0x3b, 0x04, 0x83, + 0x12, 0xdb, 0x39, 0x8f, 0x7e, 0xdc, 0xf5, 0xc8, 0xa3, 0x33, 0x42, 0x03, 0x3b, 0x66, 0x83, 0x88, + 0x92, 0x98, 0xa0, 0x3b, 0x0e, 0x71, 0x2e, 0x04, 0x3a, 0x90, 0x31, 0xf7, 0xee, 0xce, 0xc8, 0x8c, + 0x08, 0x74, 0x97, 0x5b, 0x69, 0x60, 0xff, 0xef, 0x06, 0x74, 0x8e, 0xc7, 0x47, 0x9e, 0x8f, 0x8f, + 0x44, 0x02, 0xf4, 0x14, 0x9a, 0x69, 0x2a, 0x43, 0xe9, 0x29, 0x3b, 0x5b, 0x7b, 0x9f, 0x0d, 0x36, + 0x52, 0x0d, 0xf2, 0x84, 0xc1, 0xda, 0x3c, 0xa8, 0x5f, 0xfd, 0xf1, 0x51, 0x65, 0x22, 0xf9, 0x68, + 0x1f, 0x6a, 0x0e, 0x9b, 0x1b, 0xd5, 0x9e, 0xb2, 0xd3, 0xde, 0xfb, 0xb0, 0x24, 0xcd, 0x70, 0xfa, + 0x72, 0x1c, 0xc5, 0x1e, 0x09, 0x99, 0x64, 0xf2, 0x78, 0x74, 0x0c, 0x5a, 0xb0, 0x64, 0x97, 0xbe, + 0x45, 0x92, 0xd8, 0xa8, 0x09, 0xf2, 0xa7, 0x25, 0xe4, 0xd3, 0xe5, 0xf4, 0xf9, 0xc9, 0x38, 0x89, + 0xcf, 0x3c, 0x1f, 0x17, 0xb3, 0xb4, 0x04, 0x7d, 0x9c, 0xc4, 0xe8, 0x5b, 0x50, 0xa3, 0x99, 0xe5, + 0x90, 0x68, 0x69, 0xd4, 0x45, 0xa2, 0x5e, 0x49, 0xa2, 0xef, 0x67, 0x43, 0x12, 0x2d, 0x8b, 0x29, + 0x9a, 0x91, 0x70, 0xa2, 0x29, 0xb4, 0x1d, 0x12, 0x44, 0x14, 0x33, 0xe6, 0x91, 0xd0, 0x68, 0x88, + 0x89, 0x7c, 0xfe, 0x5f, 0x13, 0x19, 0xae, 0x29, 0x32, 0x5f, 0x3e, 0x8b, 0xac, 0xca, 0x4d, 0x82, + 0xc8, 0x68, 0xbe, 0xa3, 0xaa, 0x27, 0x49, 0x10, 0x6d, 0x54, 0xc5, 0x9d, 0xe8, 0x21, 0x74, 0x99, + 0x3d, 0xc7, 0x16, 0xc5, 0x3f, 0x61, 0x27, 0xc6, 0xae, 0xa1, 0xf6, 0x94, 0x9d, 0x96, 0x0c, 0xea, + 0x70, 0x68, 0x22, 0x11, 0xf4, 0x35, 0xd4, 0xed, 0x39, 0x25, 0x46, 0x4b, 0x7c, 0xc8, 0x2c, 0xf9, + 0xd0, 0xe3, 0x39, 0x25, 0xc5, 0xcf, 0x08, 0x06, 0x7a, 0x0a, 0x90, 0xca, 0x20, 0x0a, 0xd5, 0x04, + 0xff, 0x7e, 0xa9, 0x0e, 0xec, 0xd2, 0x77, 0x37, 0x6a, 0x4d, 0x35, 0xe4, 0xe5, 0xf6, 0x31, 0x40, + 0x6e, 0xbf, 0xda, 0xa0, 0xbe, 0x08, 0x2f, 0x42, 0xb2, 0x08, 0xf5, 0x0a, 0x52, 0xa1, 0x36, 0x9c, + 0xbe, 0xd4, 0x15, 0xa4, 0x43, 0xe7, 0x54, 0xaa, 0xc6, 0x05, 0xd5, 0xab, 0xa8, 0x0b, 0xda, 0x2a, + 0xb5, 0x5e, 0x43, 0x00, 0xcd, 0x54, 0x28, 0xbd, 0x9e, 0xda, 0x3c, 0xb5, 0xde, 0x40, 0x2d, 0xa8, + 0xf3, 0x0e, 0xf4, 0x66, 0x7f, 0x1f, 0xda, 0xb9, 0xc1, 0x0b, 0x20, 0x89, 0x89, 0x5e, 0xe1, 0xd6, + 0x88, 0x84, 0x58, 0x57, 0xb8, 0xf5, 0xdd, 0x6b, 0x2f, 0xd2, 0xab, 0xdc, 0x3a, 0xe0, 0x56, 0xad, + 0xff, 0xbb, 0x02, 0xb0, 0x5e, 0x44, 0x74, 0x0f, 0x1a, 0x0e, 0x09, 0x02, 0x5b, 0x6c, 0x7f, 0x43, + 0x36, 0x93, 0xba, 0x90, 0x09, 0x2a, 0x37, 0x70, 0x18, 0x8b, 0xa5, 0xce, 0xd0, 0xcc, 0xc9, 0x75, + 0x09, 0x13, 0xdf, 0xb7, 0x70, 0xe8, 0x10, 0xd7, 0x0b, 0x67, 0x62, 0x7b, 0x35, 0x11, 0xa5, 0x4c, + 0x3a, 0x1c, 0x3a, 0x94, 0x08, 0x32, 0xa0, 0xce, 0x2e, 0xbc, 0x48, 0xac, 0x65, 0x37, 0x9b, 0x3b, + 0xf7, 0x08, 0x71, 0x63, 0xea, 0x39, 0xb1, 0x75, 0x99, 0x90, 0x18, 0x33, 0xb1, 0x74, 0x6b, 0x71, + 0x05, 0xf4, 0x5c, 0x20, 0xe8, 0x63, 0xd0, 0x28, 0x59, 0x58, 0xbe, 0x17, 0x78, 0xb1, 0x58, 0xa5, + 0x5a, 0xf6, 0x02, 0x28, 0x59, 0x9c, 0x70, 0x6f, 0xff, 0x97, 0x1a, 0xfc, 0xaf, 0xe4, 0xa5, 0xf0, + 0xaf, 0x70, 0x2a, 0xc3, 0x91, 0x4d, 0xed, 0x98, 0xd0, 0x42, 0xbb, 0x1d, 0x4a, 0x16, 0xd3, 0x0c, + 0x41, 0x8f, 0x60, 0xfb, 0xcc, 0xc3, 0xbe, 0x9b, 0x0b, 0xce, 0x77, 0xbf, 0x25, 0xc0, 0x75, 0xf8, + 0x08, 0x54, 0x1c, 0x3a, 0x3e, 0x61, 0x58, 0xb4, 0xbf, 0xb5, 0x37, 0x78, 0xbf, 0xc7, 0x3b, 0x38, + 0x4c, 0x59, 0xd9, 0x50, 0x65, 0x12, 0xd4, 0x83, 0x96, 0x34, 0xa9, 0x98, 0x56, 0xf6, 0xdd, 0x95, + 0x17, 0xdd, 0x07, 0x38, 0xb7, 0x99, 0x85, 0x99, 0x63, 0x47, 0xb8, 0x30, 0x2e, 0xed, 0xdc, 0x66, + 0x87, 0xc2, 0x8d, 0x3e, 0x80, 0xa6, 0x0c, 0x68, 0xe6, 0x92, 0x48, 0xdf, 0x4a, 0x0e, 0xb5, 0x4c, + 0x8e, 0xa2, 0xa6, 0xad, 0x7f, 0xd5, 0xb4, 0x20, 0x07, 0x94, 0xca, 0x31, 0x00, 0x55, 0xb6, 0x89, + 0x34, 0x68, 0x8c, 0xf0, 0x1c, 0x53, 0xbd, 0xc2, 0xf7, 0xf9, 0xb1, 0xbf, 0xb0, 0x97, 0x4c, 0x57, + 0x50, 0x07, 0x5a, 0xe9, 0x40, 0x6c, 0x5f, 0xaf, 0x3e, 0xab, 0xb7, 0x34, 0x1d, 0xfa, 0x0c, 0xba, + 0x85, 0x23, 0x85, 0xfa, 0xa0, 0xb9, 0x58, 0x7c, 0x07, 0x17, 0x95, 0x5b, 0xbb, 0x79, 0x4b, 0xbc, + 0x3a, 0xa1, 0x95, 0x96, 0xb5, 0xc4, 0x3d, 0xe8, 0x13, 0x80, 0xc0, 0x7e, 0x35, 0x21, 0x8b, 0xa9, + 0xf7, 0x3a, 0x15, 0x29, 0xa3, 0xe7, 0xfc, 0xfd, 0x5f, 0x15, 0xfe, 0xd5, 0xdc, 0x11, 0xba, 0xc5, + 0x53, 0xca, 0x79, 0xc5, 0x29, 0x54, 0xcb, 0xa6, 0x80, 0xbe, 0x04, 0xe4, 0xcd, 0x42, 0x42, 0xb1, + 0x95, 0x84, 0x2c, 0x89, 0x22, 0x42, 0xf9, 0x11, 0xab, 0xe5, 0x84, 0xbb, 0x93, 0xe2, 0x2f, 0xd6, + 0x30, 0xfa, 0x06, 0xfe, 0xbf, 0x49, 0xb2, 0x7c, 0x32, 0x13, 0x5b, 0x91, 0x75, 0x78, 0x77, 0x83, + 0x78, 0x42, 0x66, 0xfd, 0x7d, 0xd0, 0x6f, 0x9f, 0xa9, 0x62, 0x9d, 0x4a, 0xa9, 0x5a, 0xbf, 0x55, + 0xa1, 0x9d, 0x3b, 0x8f, 0x68, 0x78, 0xeb, 0xd7, 0xf8, 0xe0, 0xdd, 0xe7, 0x74, 0x50, 0xfa, 0x57, + 0x7c, 0x00, 0x6d, 0xf9, 0xbe, 0x03, 0xe2, 0x62, 0x31, 0xa1, 0xac, 0x6b, 0x48, 0x81, 0x53, 0xe2, + 0x62, 0x3e, 0x6c, 0xe6, 0x9c, 0xe3, 0xc0, 0x7e, 0x36, 0x1d, 0x8f, 0x72, 0x87, 0x84, 0x47, 0xad, + 0xfc, 0xe8, 0x0b, 0xd8, 0x0e, 0xec, 0x57, 0x16, 0xc5, 0x0e, 0xa1, 0xae, 0xc5, 0xb8, 0x2e, 0xf9, + 0x37, 0xd2, 0xe5, 0xba, 0x08, 0x4c, 0x48, 0xb3, 0x0b, 0x7a, 0x16, 0xb9, 0x7a, 0xca, 0x8d, 0x5c, + 0xf8, 0x76, 0x8a, 0xae, 0xdf, 0xf2, 0x7b, 0x1c, 0x98, 0xaf, 0xa0, 0x29, 0x0f, 0xbb, 0x0a, 0xb5, + 0xf1, 0xf0, 0x48, 0xaf, 0xa0, 0x6d, 0x68, 0x1f, 0x1c, 0x8f, 0xac, 0xc9, 0xe1, 0x70, 0x3c, 0x79, + 0x32, 0x4d, 0x8f, 0x3b, 0xaf, 0x76, 0xe5, 0xa9, 0x1e, 0x3c, 0xbc, 0xfa, 0xcb, 0xac, 0x5c, 0x5d, + 0x9b, 0xca, 0x9b, 0x6b, 0x53, 0x79, 0x7b, 0x6d, 0x2a, 0x7f, 0x5e, 0x9b, 0xca, 0xcf, 0x37, 0x66, + 0xe5, 0xcd, 0x8d, 0x59, 0x79, 0x7b, 0x63, 0x56, 0x7e, 0x50, 0xe5, 0x40, 0xff, 0x09, 0x00, 0x00, + 0xff, 0xff, 0x17, 0x19, 0x54, 0x15, 0xe4, 0x08, 0x00, 0x00, } diff --git a/pkg/roachpb/io-formats.proto b/pkg/roachpb/io-formats.proto index b61cf85c9123..e7291619c67b 100644 --- a/pkg/roachpb/io-formats.proto +++ b/pkg/roachpb/io-formats.proto @@ -110,6 +110,14 @@ message PgDumpOptions { // Indicates the number of rows to import per table. // Must be a non-zero positive number. optional int64 row_limit = 2 [(gogoproto.nullable) = false]; + // Indicates if all unparseable and parseable, but unimplemented PGDUMP stmts + // should be ignored during IMPORT. + optional bool ignore_unsupported = 3 [(gogoproto.nullable) = false]; + // Points to the destination where unsupported statements during a PGDUMP + // import should be logged. This can only be used when ignore_unsupported is + // specified, otherwise the IMPORT errors out on encountering an unsupported + // stmt. + optional string ignore_unsupported_log = 4 [(gogoproto.nullable) = false]; } message MysqldumpOptions { diff --git a/pkg/sql/logictest/testdata/logic_test/union b/pkg/sql/logictest/testdata/logic_test/union index 9a6172e49b26..4997fdddcb3b 100644 --- a/pkg/sql/logictest/testdata/logic_test/union +++ b/pkg/sql/logictest/testdata/logic_test/union @@ -348,3 +348,21 @@ NULL statement ok CREATE TABLE ab (a INT, b INT); SELECT a, b, rowid FROM ab UNION VALUES (1, 2, 3); +DROP TABLE ab; + +# Regression test for #59148. +statement ok +CREATE TABLE ab (a INT4, b INT8); +INSERT INTO ab VALUES (1, 1), (1, 2), (2, 1), (2, 2); + +query I rowsort +SELECT a FROM ab UNION SELECT b FROM ab +---- +1 +2 + +query I rowsort +SELECT b FROM ab UNION SELECT a FROM ab +---- +1 +2 diff --git a/pkg/sql/multiregion_test.go b/pkg/sql/multiregion_test.go index bc4a20aabbee..db761506eb0d 100644 --- a/pkg/sql/multiregion_test.go +++ b/pkg/sql/multiregion_test.go @@ -18,12 +18,15 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/sqltestutils" + "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" ) func TestSettingPrimaryRegionAmidstDrop(t *testing.T) { @@ -127,3 +130,77 @@ func TestSettingPrimaryRegionAmidstDrop(t *testing.T) { return nil }) } + +// TestDroppingPrimaryRegionAsyncJobFailure drops the primary region of the +// database, which results in dropping the multi-region type descriptor. Then, +// it errors out the async job associated with the type descriptor cleanup and +// ensures the namespace entry is reclaimed back despite the injected error. +// We rely on this behavior to be able to add multi-region capability in the +// future. +func TestDroppingPrimaryRegionAsyncJobFailure(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // Decrease the adopt loop interval so that retries happen quickly. + defer sqltestutils.SetTestJobsAdoptInterval()() + + params, _ := tests.CreateTestServerParams() + params.Locality.Tiers = []roachpb.Tier{ + {Key: "region", Value: "us-east-1"}, + } + + // Protects expectedCleanupRuns + var mu syncutil.Mutex + // We need to cleanup 2 times, once for the multi-region type descriptor and + // once for the array alias of the multi-region type descriptor. + haveWePerformedFirstRoundOfCleanup := false + cleanupFinished := make(chan struct{}) + params.Knobs.SQLTypeSchemaChanger = &sql.TypeSchemaChangerTestingKnobs{ + RunBeforeExec: func() error { + return errors.New("yikes") + }, + RunAfterOnFailOrCancel: func() error { + mu.Lock() + defer mu.Unlock() + if haveWePerformedFirstRoundOfCleanup { + close(cleanupFinished) + } + haveWePerformedFirstRoundOfCleanup = true + return nil + }, + } + + s, sqlDB, _ := serverutils.StartServer(t, params) + ctx := context.Background() + defer s.Stopper().Stop(ctx) + + // Setup the test. + _, err := sqlDB.Exec(` +CREATE DATABASE db WITH PRIMARY REGION "us-east-1"; +CREATE TABLE db.t(k INT) LOCALITY REGIONAL BY TABLE IN PRIMARY REGION; +`) + require.NoError(t, err) + + _, err = sqlDB.Exec(`ALTER DATABASE db DROP REGION "us-east-1"`) + testutils.IsError(err, "yikes") + + <-cleanupFinished + + rows := sqlDB.QueryRow(`SELECT count(*) FROM system.namespace WHERE name = 'crdb_internal_region'`) + var count int + err = rows.Scan(&count) + require.NoError(t, err) + if count != 0 { + t.Fatal("expected crdb_internal_region not to be present in system.namespace") + } + + _, err = sqlDB.Exec(`ALTER DATABASE db PRIMARY REGION "us-east-1"`) + require.NoError(t, err) + + rows = sqlDB.QueryRow(`SELECT count(*) FROM system.namespace WHERE name = 'crdb_internal_region'`) + err = rows.Scan(&count) + require.NoError(t, err) + if count != 1 { + t.Fatal("expected crdb_internal_region to be present in system.namespace") + } +} diff --git a/pkg/sql/opt/optbuilder/BUILD.bazel b/pkg/sql/opt/optbuilder/BUILD.bazel index d33bb967c07e..372984786d2a 100644 --- a/pkg/sql/opt/optbuilder/BUILD.bazel +++ b/pkg/sql/opt/optbuilder/BUILD.bazel @@ -82,6 +82,7 @@ go_test( srcs = [ "builder_test.go", "name_resolution_test.go", + "union_test.go", ], data = glob(["testdata/**"]), embed = [":optbuilder"], @@ -96,6 +97,7 @@ go_test( "//pkg/sql/parser", "//pkg/sql/sem/builtins", "//pkg/sql/sem/tree", + "//pkg/sql/types", "//pkg/testutils/sqlutils", "//pkg/util/leaktest", "@com_github_cockroachdb_datadriven//:datadriven", diff --git a/pkg/sql/opt/optbuilder/testdata/union b/pkg/sql/opt/optbuilder/testdata/union index cadb97740dfa..f2ffbdce4f6f 100644 --- a/pkg/sql/opt/optbuilder/testdata/union +++ b/pkg/sql/opt/optbuilder/testdata/union @@ -874,3 +874,128 @@ except │ │ └── 1 [as="?column?":6] │ └── 1 └── (1,) + +# Verify that we add casts for equivalent, but not identical types. +exec-ddl +CREATE TABLE ab (i8 INT8, i4 INT4, f8 FLOAT, f4 FLOAT4, d DECIMAL) +---- + +build +SELECT i4 FROM ab UNION SELECT i8 FROM ab +---- +union + ├── columns: i4:16 + ├── left columns: i4:15 + ├── right columns: i8:8 + ├── project + │ ├── columns: i4:15 + │ ├── project + │ │ ├── columns: ab.i4:2 + │ │ └── scan ab + │ │ └── columns: i8:1 ab.i4:2 f8:3 f4:4 d:5 rowid:6!null crdb_internal_mvcc_timestamp:7 + │ └── projections + │ └── ab.i4:2::INT8 [as=i4:15] + └── project + ├── columns: i8:8 + └── scan ab + └── columns: i8:8 ab.i4:9 f8:10 f4:11 d:12 rowid:13!null crdb_internal_mvcc_timestamp:14 + +build +SELECT i8 FROM ab UNION SELECT i4 FROM ab +---- +union + ├── columns: i8:16 + ├── left columns: ab.i8:1 + ├── right columns: i4:15 + ├── project + │ ├── columns: ab.i8:1 + │ └── scan ab + │ └── columns: ab.i8:1 ab.i4:2 f8:3 f4:4 d:5 rowid:6!null crdb_internal_mvcc_timestamp:7 + └── project + ├── columns: i4:15 + ├── project + │ ├── columns: ab.i4:9 + │ └── scan ab + │ └── columns: ab.i8:8 ab.i4:9 f8:10 f4:11 d:12 rowid:13!null crdb_internal_mvcc_timestamp:14 + └── projections + └── ab.i4:9::INT8 [as=i4:15] + +build +SELECT f4 FROM ab UNION SELECT f8 FROM ab +---- +union + ├── columns: f4:16 + ├── left columns: f4:15 + ├── right columns: f8:10 + ├── project + │ ├── columns: f4:15 + │ ├── project + │ │ ├── columns: ab.f4:4 + │ │ └── scan ab + │ │ └── columns: i8:1 i4:2 f8:3 ab.f4:4 d:5 rowid:6!null crdb_internal_mvcc_timestamp:7 + │ └── projections + │ └── ab.f4:4::FLOAT8 [as=f4:15] + └── project + ├── columns: f8:10 + └── scan ab + └── columns: i8:8 i4:9 f8:10 ab.f4:11 d:12 rowid:13!null crdb_internal_mvcc_timestamp:14 + +build +SELECT i8 FROM ab UNION SELECT f4 FROM ab +---- +union + ├── columns: i8:16 + ├── left columns: i8:15 + ├── right columns: f4:11 + ├── project + │ ├── columns: i8:15 + │ ├── project + │ │ ├── columns: ab.i8:1 + │ │ └── scan ab + │ │ └── columns: ab.i8:1 i4:2 f8:3 f4:4 d:5 rowid:6!null crdb_internal_mvcc_timestamp:7 + │ └── projections + │ └── ab.i8:1::FLOAT4 [as=i8:15] + └── project + ├── columns: f4:11 + └── scan ab + └── columns: ab.i8:8 i4:9 f8:10 f4:11 d:12 rowid:13!null crdb_internal_mvcc_timestamp:14 + +build +SELECT i8 FROM ab UNION SELECT d FROM ab +---- +union + ├── columns: i8:16 + ├── left columns: i8:15 + ├── right columns: d:12 + ├── project + │ ├── columns: i8:15 + │ ├── project + │ │ ├── columns: ab.i8:1 + │ │ └── scan ab + │ │ └── columns: ab.i8:1 i4:2 f8:3 f4:4 d:5 rowid:6!null crdb_internal_mvcc_timestamp:7 + │ └── projections + │ └── ab.i8:1::DECIMAL [as=i8:15] + └── project + ├── columns: d:12 + └── scan ab + └── columns: ab.i8:8 i4:9 f8:10 f4:11 d:12 rowid:13!null crdb_internal_mvcc_timestamp:14 + +build +SELECT d FROM ab UNION SELECT f8 FROM ab +---- +union + ├── columns: d:16 + ├── left columns: ab.d:5 + ├── right columns: f8:15 + ├── project + │ ├── columns: ab.d:5 + │ └── scan ab + │ └── columns: i8:1 i4:2 ab.f8:3 f4:4 ab.d:5 rowid:6!null crdb_internal_mvcc_timestamp:7 + └── project + ├── columns: f8:15 + ├── project + │ ├── columns: ab.f8:10 + │ └── scan ab + │ └── columns: i8:8 i4:9 ab.f8:10 f4:11 ab.d:12 rowid:13!null crdb_internal_mvcc_timestamp:14 + └── projections + └── ab.f8:10::DECIMAL [as=f8:15] diff --git a/pkg/sql/opt/optbuilder/testdata/with b/pkg/sql/opt/optbuilder/testdata/with index b1a36b8dfe47..754102b6a61e 100644 --- a/pkg/sql/opt/optbuilder/testdata/with +++ b/pkg/sql/opt/optbuilder/testdata/with @@ -1327,6 +1327,12 @@ with &3 (cte) └── mapping: └── a:5 => a:8 +# We don't support upcasting the "initial" query. +build +WITH RECURSIVE cte(x) AS (SELECT a FROM x UNION ALL SELECT x::FLOAT FROM cte WHERE x < 10) SELECT * FROM cte; +---- +error (42804): UNION types int and float cannot be matched for WITH RECURSIVE + # Mutating WITHs not allowed at non-root positions. build SELECT * FROM (WITH foo AS (INSERT INTO y VALUES (1) RETURNING *) SELECT * FROM foo) diff --git a/pkg/sql/opt/optbuilder/union.go b/pkg/sql/opt/optbuilder/union.go index ab6d1b3284fc..742818cc6ecf 100644 --- a/pkg/sql/opt/optbuilder/union.go +++ b/pkg/sql/opt/optbuilder/union.go @@ -37,7 +37,7 @@ func (b *Builder) buildUnionClause( } func (b *Builder) buildSetOp( - typ tree.UnionType, all bool, inScope, leftScope, rightScope *scope, + unionType tree.UnionType, all bool, inScope, leftScope, rightScope *scope, ) (outScope *scope) { // Remove any hidden columns, as they are not included in the Union. leftScope.removeHiddenCols() @@ -45,33 +45,22 @@ func (b *Builder) buildSetOp( outScope = inScope.push() - // propagateTypesLeft/propagateTypesRight indicate whether we need to wrap - // the left/right side in a projection to cast some of the columns to the - // correct type. - // For example: - // SELECT NULL UNION SELECT 1 - // The type of NULL is unknown, and the type of 1 is int. We need to - // wrap the left side in a project operation with a Cast expression so the - // output column will have the correct type. - propagateTypesLeft, propagateTypesRight := b.checkTypesMatch( - leftScope, rightScope, - true, /* tolerateUnknownLeft */ - true, /* tolerateUnknownRight */ - typ.String(), + setOpTypes, leftCastsNeeded, rightCastsNeeded := b.typeCheckSetOp( + leftScope, rightScope, unionType.String(), ) - if propagateTypesLeft { - leftScope = b.propagateTypes(leftScope /* dst */, rightScope /* src */) + if leftCastsNeeded { + leftScope = b.addCasts(leftScope /* dst */, setOpTypes) } - if propagateTypesRight { - rightScope = b.propagateTypes(rightScope /* dst */, leftScope /* src */) + if rightCastsNeeded { + rightScope = b.addCasts(rightScope /* dst */, setOpTypes) } // For UNION, we have to synthesize new output columns (because they contain // values from both the left and right relations). This is not necessary for // INTERSECT or EXCEPT, since these operations are basically filters on the // left relation. - if typ == tree.UnionOp { + if unionType == tree.UnionOp { outScope.cols = make([]scopeColumn, 0, len(leftScope.cols)) for i := range leftScope.cols { c := &leftScope.cols[i] @@ -92,7 +81,7 @@ func (b *Builder) buildSetOp( private := memo.SetPrivate{LeftCols: leftCols, RightCols: rightCols, OutCols: newCols} if all { - switch typ { + switch unionType { case tree.UnionOp: outScope.expr = b.factory.ConstructUnionAll(left, right, &private) case tree.IntersectOp: @@ -101,7 +90,7 @@ func (b *Builder) buildSetOp( outScope.expr = b.factory.ConstructExceptAll(left, right, &private) } } else { - switch typ { + switch unionType { case tree.UnionOp: outScope.expr = b.factory.ConstructUnion(left, right, &private) case tree.IntersectOp: @@ -114,25 +103,17 @@ func (b *Builder) buildSetOp( return outScope } -// checkTypesMatch is used when the columns must match between two scopes (e.g. -// for a UNION). Throws an error if the scopes don't have the same number of -// columns, or when column types don't match 1-1, except: -// - if tolerateUnknownLeft is set and the left column has Unknown type while -// the right has a known type (in this case it returns propagateToLeft=true). -// - if tolerateUnknownRight is set and the right column has Unknown type while -// the right has a known type (in this case it returns propagateToRight=true). +// typeCheckSetOp cross-checks the types between the left and right sides of a +// set operation and determines the output types. Either side (or both) might +// need casts (as indicated in the return values). // -// clauseTag is used only in error messages. +// Throws an error if the scopes don't have the same number of columns, or when +// column types don't match 1-1 or can't be cast to a single output type. The +// error messages use clauseTag. // -// TODO(dan): This currently checks whether the types are exactly the same, -// but Postgres is more lenient: -// http://www.postgresql.org/docs/9.5/static/typeconv-union-case.html. -func (b *Builder) checkTypesMatch( - leftScope, rightScope *scope, - tolerateUnknownLeft bool, - tolerateUnknownRight bool, - clauseTag string, -) (propagateToLeft, propagateToRight bool) { +func (b *Builder) typeCheckSetOp( + leftScope, rightScope *scope, clauseTag string, +) (setOpTypes []*types.T, leftCastsNeeded, rightCastsNeeded bool) { if len(leftScope.cols) != len(rightScope.cols) { panic(pgerror.Newf( pgcode.Syntax, @@ -141,38 +122,79 @@ func (b *Builder) checkTypesMatch( )) } + setOpTypes = make([]*types.T, len(leftScope.cols)) for i := range leftScope.cols { l := &leftScope.cols[i] r := &rightScope.cols[i] - if l.typ.Equivalent(r.typ) { - continue - } + typ := determineUnionType(l.typ, r.typ, clauseTag) + setOpTypes[i] = typ + leftCastsNeeded = leftCastsNeeded || !l.typ.Identical(typ) + rightCastsNeeded = rightCastsNeeded || !r.typ.Identical(typ) + } + return setOpTypes, leftCastsNeeded, rightCastsNeeded +} - // Note that Unknown types are equivalent so at this point at most one of - // the types can be Unknown. - if l.typ.Family() == types.UnknownFamily && tolerateUnknownLeft { - propagateToLeft = true - continue +// determineUnionType determines the resulting type of a set operation on a +// column with the given left and right types. +// +// We allow implicit up-casts between types of the same numeric family with +// different widths; between int and float; and between int/float and decimal. +// +// Throws an error if we don't support a set operation between the two types. +func determineUnionType(left, right *types.T, clauseTag string) *types.T { + if left.Identical(right) { + return left + } + + if left.Equivalent(right) { + // Do a best-effort attempt to determine which type is "larger". + if left.Width() > right.Width() { + return left } - if r.typ.Family() == types.UnknownFamily && tolerateUnknownRight { - propagateToRight = true - continue + if left.Width() < right.Width() { + return right } + // In other cases, use the left type. + return left + } + leftFam, rightFam := left.Family(), right.Family() - panic(pgerror.Newf( - pgcode.DatatypeMismatch, - "%v types %s and %s cannot be matched", clauseTag, l.typ, r.typ, - )) + if rightFam == types.UnknownFamily { + return left + } + if leftFam == types.UnknownFamily { + return right + } + + // Allow implicit upcast from int to float. Converting an int to float can be + // lossy (especially INT8 to FLOAT4), but this is what Postgres does. + if leftFam == types.FloatFamily && rightFam == types.IntFamily { + return left + } + if leftFam == types.IntFamily && rightFam == types.FloatFamily { + return right } - return propagateToLeft, propagateToRight + + // Allow implicit upcasts to decimal. + if leftFam == types.DecimalFamily && (rightFam == types.IntFamily || rightFam == types.FloatFamily) { + return left + } + if (leftFam == types.IntFamily || leftFam == types.FloatFamily) && rightFam == types.DecimalFamily { + return right + } + + // TODO(radu): Postgres has more encompassing rules: + // http://www.postgresql.org/docs/12/static/typeconv-union-case.html + panic(pgerror.Newf( + pgcode.DatatypeMismatch, + "%v types %s and %s cannot be matched", clauseTag, left, right, + )) } -// propagateTypes propagates the types of the source columns to the destination -// columns by wrapping the destination in a Project operation. The Project -// operation passes through columns that already have the correct type, and -// creates cast expressions for those that don't. -func (b *Builder) propagateTypes(dst, src *scope) *scope { +// addCasts adds a projection to a scope, adding casts as necessary so that the +// resulting columns have the given types. +func (b *Builder) addCasts(dst *scope, outTypes []*types.T) *scope { expr := dst.expr.(memo.RelExpr) dstCols := dst.cols @@ -180,12 +202,10 @@ func (b *Builder) propagateTypes(dst, src *scope) *scope { dst.cols = make([]scopeColumn, 0, len(dstCols)) for i := 0; i < len(dstCols); i++ { - dstType := dstCols[i].typ - srcType := src.cols[i].typ - if dstType.Family() == types.UnknownFamily && srcType.Family() != types.UnknownFamily { + if !dstCols[i].typ.Identical(outTypes[i]) { // Create a new column which casts the old column to the correct type. - castExpr := b.factory.ConstructCast(b.factory.ConstructVariable(dstCols[i].id), srcType) - b.synthesizeColumn(dst, string(dstCols[i].name), srcType, nil /* expr */, castExpr) + castExpr := b.factory.ConstructCast(b.factory.ConstructVariable(dstCols[i].id), outTypes[i]) + b.synthesizeColumn(dst, string(dstCols[i].name), outTypes[i], nil /* expr */, castExpr) } else { // The column is already the correct type, so add it as a passthrough // column. diff --git a/pkg/sql/opt/optbuilder/union_test.go b/pkg/sql/opt/optbuilder/union_test.go new file mode 100644 index 000000000000..112d5159e3a0 --- /dev/null +++ b/pkg/sql/opt/optbuilder/union_test.go @@ -0,0 +1,98 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package optbuilder + +import ( + "testing" + + "github.com/cockroachdb/cockroach/pkg/sql/types" +) + +func TestUnionType(t *testing.T) { + testCases := []struct { + left, right, expected *types.T + }{ + { + left: types.Unknown, + right: types.Int, + expected: types.Int, + }, + { + left: types.Int, + right: types.Unknown, + expected: types.Int, + }, + { + left: types.Int4, + right: types.Int, + expected: types.Int, + }, + { + left: types.Int4, + right: types.Int2, + expected: types.Int4, + }, + { + left: types.Float4, + right: types.Float, + expected: types.Float, + }, + { + left: types.MakeDecimal(12 /* precision */, 5 /* scale */), + right: types.MakeDecimal(10 /* precision */, 7 /* scale */), + expected: types.MakeDecimal(10 /* precision */, 7 /* scale */), + }, + { + // At the same scale, we use the left type. + left: types.MakeDecimal(10 /* precision */, 1 /* scale */), + right: types.MakeDecimal(12 /* precision */, 1 /* scale */), + expected: types.MakeDecimal(10 /* precision */, 1 /* scale */), + }, + { + left: types.Int4, + right: types.Decimal, + expected: types.Decimal, + }, + { + left: types.Decimal, + right: types.Float, + expected: types.Decimal, + }, + { + // Error. + left: types.Float, + right: types.String, + expected: nil, + }, + } + + for _, tc := range testCases { + result := func() *types.T { + defer func() { + // Swallow any error and return nil. + _ = recover() + }() + return determineUnionType(tc.left, tc.right, "test") + }() + toStr := func(t *types.T) string { + if t == nil { + return "" + } + return t.SQLString() + } + if toStr(result) != toStr(tc.expected) { + t.Errorf( + "left: %s right: %s expected: %s got: %s", + toStr(tc.left), toStr(tc.right), toStr(tc.expected), toStr(result), + ) + } + } +} diff --git a/pkg/sql/opt/optbuilder/with.go b/pkg/sql/opt/optbuilder/with.go index e53e569c85f7..9721587b767d 100644 --- a/pkg/sql/opt/optbuilder/with.go +++ b/pkg/sql/opt/optbuilder/with.go @@ -186,13 +186,21 @@ func (b *Builder) buildCTE( // We allow propagation of types from the initial query to the recursive // query. - _, propagateToRight := b.checkTypesMatch(initialScope, recursiveScope, - false, /* tolerateUnknownLeft */ - true, /* tolerateUnknownRight */ - "UNION", - ) - if propagateToRight { - recursiveScope = b.propagateTypes(recursiveScope /* dst */, initialScope /* src */) + outTypes, leftCastsNeeded, rightCastsNeeded := b.typeCheckSetOp(initialScope, recursiveScope, "UNION") + if leftCastsNeeded { + // We don't support casts on the initial expression; error out. + for i := range outTypes { + if !outTypes[i].Identical(initialScope.cols[i].typ) { + panic(pgerror.Newf( + pgcode.DatatypeMismatch, + "UNION types %s and %s cannot be matched for WITH RECURSIVE", + initialScope.cols[i].typ, recursiveScope.cols[i].typ, + )) + } + } + } + if rightCastsNeeded { + recursiveScope = b.addCasts(recursiveScope, outTypes) } private := memo.RecursiveCTEPrivate{ diff --git a/pkg/sql/opt/xform/testdata/rules/select b/pkg/sql/opt/xform/testdata/rules/select index 2faca1cd58f3..e1fe4b07a71a 100644 --- a/pkg/sql/opt/xform/testdata/rules/select +++ b/pkg/sql/opt/xform/testdata/rules/select @@ -127,7 +127,8 @@ CREATE TABLE virtual ( upper_s STRING AS (upper(s)) VIRTUAL, INDEX (a) WHERE c IN (10, 20, 30), INDEX (lower_s), - INDEX (a) WHERE upper_s = 'FOO' + INDEX (a) WHERE upper_s = 'FOO', + INDEX (c) WHERE c > 100 ) ---- @@ -376,8 +377,10 @@ project │ ├── partial index predicates │ │ ├── secondary: filters │ │ │ └── (b:3 + 10) IN (10, 20, 30) [outer=(3), immutable] + │ │ ├── secondary: filters + │ │ │ └── upper(s:5) = 'FOO' [outer=(5), immutable] │ │ └── secondary: filters - │ │ └── upper(s:5) = 'FOO' [outer=(5), immutable] + │ │ └── b:3 > 90 [outer=(3), constraints=(/3: [/91 - ]; tight)] │ ├── key: (1) │ └── fd: (1)-->(3) └── filters @@ -1724,8 +1727,10 @@ project │ ├── partial index predicates │ │ ├── secondary: filters │ │ │ └── (b:3 + 10) IN (10, 20, 30) [outer=(3), immutable] + │ │ ├── secondary: filters + │ │ │ └── upper(s:5) = 'FOO' [outer=(5), immutable] │ │ └── secondary: filters - │ │ └── upper(s:5) = 'FOO' [outer=(5), immutable] + │ │ └── b:3 > 90 [outer=(3), constraints=(/3: [/91 - ]; tight)] │ ├── key: (1) │ └── fd: (1)-->(2,3) └── filters @@ -1747,9 +1752,60 @@ project ├── key: (1) └── fd: (1)-->(2) +# Constrained partial index scan with an indexed virtual computed column also in +# the predicate. +# TODO(mgartner): Teach indexConstraintCtx.simplifyFilter to remove b=140 from +# the remaining filters after the index constraint is built. This will eliminate +# the unnecessary index-join and select expressions. +opt +SELECT k FROM virtual WHERE c = 150 +---- +project + ├── columns: k:1!null + ├── key: (1) + └── select + ├── columns: k:1!null b:3!null + ├── key: (1) + ├── fd: ()-->(3) + ├── index-join virtual + │ ├── columns: k:1!null b:3 + │ ├── key: (1) + │ ├── fd: (1)-->(3) + │ └── scan virtual@secondary,partial + │ ├── columns: k:1!null + │ ├── constraint: /4/1: [/150 - /150] + │ └── key: (1) + └── filters + └── b:3 = 140 [outer=(3), constraints=(/3: [/140 - /140]; tight), fd=()-->(3)] + +# Constrain a scan on a partial index on virtual column c constraining the +# dependent column b. +# TODO(mgartner): Teach indexConstraintCtx.simplifyFilter to remove b=140 from +# the remaining filters after the index constraint is built. This will eliminate +# the unnecessary index-join and select expressions. +opt +SELECT k FROM virtual WHERE b = 140 +---- +project + ├── columns: k:1!null + ├── key: (1) + └── select + ├── columns: k:1!null b:3!null + ├── key: (1) + ├── fd: ()-->(3) + ├── index-join virtual + │ ├── columns: k:1!null b:3 + │ ├── key: (1) + │ ├── fd: (1)-->(3) + │ └── scan virtual@secondary,partial + │ ├── columns: k:1!null + │ ├── constraint: /4/1: [/150 - /150] + │ └── key: (1) + └── filters + └── b:3 = 140 [outer=(3), constraints=(/3: [/140 - /140]; tight), fd=()-->(3)] + # Regression test for #55387. GenerateConstrainedScans should not reduce the # input filters when proving partial index implication. - exec-ddl CREATE TABLE t55387 ( k INT PRIMARY KEY, diff --git a/pkg/sql/parser/lexer.go b/pkg/sql/parser/lexer.go index 62f55940d1f2..73870c7b8991 100644 --- a/pkg/sql/parser/lexer.go +++ b/pkg/sql/parser/lexer.go @@ -153,36 +153,52 @@ func (l *lexer) UpdateNumPlaceholders(p *tree.Placeholder) { } } -// Unimplemented wraps Error, setting lastUnimplementedError. -func (l *lexer) Unimplemented(feature string) { - l.lastError = unimp.New(feature, "this syntax") +// PurposelyUnimplemented wraps Error, setting lastUnimplementedError. +func (l *lexer) PurposelyUnimplemented(feature string, reason string) { + // We purposely do not use unimp here, as it appends hints to suggest that + // the error may be actively tracked as a bug. + l.lastError = errors.WithHint( + errors.WithTelemetry( + pgerror.Newf(pgcode.Syntax, "unimplemented: this syntax"), + fmt.Sprintf("sql.purposely_unimplemented.%s", feature), + ), + reason, + ) l.populateErrorDetails() + l.lastError = &tree.UnsupportedError{ + Err: l.lastError, + FeatureName: feature, + } } // UnimplementedWithIssue wraps Error, setting lastUnimplementedError. func (l *lexer) UnimplementedWithIssue(issue int) { l.lastError = unimp.NewWithIssue(issue, "this syntax") l.populateErrorDetails() + l.lastError = &tree.UnsupportedError{ + Err: l.lastError, + FeatureName: fmt.Sprintf("https://github.com/cockroachdb/cockroach/issues/%d", issue), + } } // UnimplementedWithIssueDetail wraps Error, setting lastUnimplementedError. func (l *lexer) UnimplementedWithIssueDetail(issue int, detail string) { l.lastError = unimp.NewWithIssueDetail(issue, detail, "this syntax") l.populateErrorDetails() + l.lastError = &tree.UnsupportedError{ + Err: l.lastError, + FeatureName: detail, + } } -// PurposelyUnimplemented wraps Error, setting lastUnimplementedError. -func (l *lexer) PurposelyUnimplemented(feature string, reason string) { - // We purposely do not use unimp here, as it appends hints to suggest that - // the error may be actively tracked as a bug. - l.lastError = errors.WithHint( - errors.WithTelemetry( - pgerror.Newf(pgcode.Syntax, "unimplemented: this syntax"), - fmt.Sprintf("sql.purposely_unimplemented.%s", feature), - ), - reason, - ) +// Unimplemented wraps Error, setting lastUnimplementedError. +func (l *lexer) Unimplemented(feature string) { + l.lastError = unimp.New(feature, "this syntax") l.populateErrorDetails() + l.lastError = &tree.UnsupportedError{ + Err: l.lastError, + FeatureName: feature, + } } // setErr is called from parsing action rules to register an error observed diff --git a/pkg/sql/parser/parse_test.go b/pkg/sql/parser/parse_test.go index fc5a0e5e5565..ccaebfbae7aa 100644 --- a/pkg/sql/parser/parse_test.go +++ b/pkg/sql/parser/parse_test.go @@ -3214,9 +3214,12 @@ func TestUnimplementedSyntax(t *testing.T) { {`ALTER TABLE a ADD CONSTRAINT foo EXCLUDE USING gist (bar WITH =)`, 46657, `add constraint exclude using`, ``}, {`ALTER TABLE a INHERITS b`, 22456, `alter table inherits`, ``}, {`ALTER TABLE a NO INHERITS b`, 22456, `alter table no inherits`, ``}, + {`ALTER FUNCTION public.isnumeric(text) OWNER TO bob`, 0, `alter function`, ``}, {`CREATE ACCESS METHOD a`, 0, `create access method`, ``}, + {`COMMENT ON EXTENSION plpgsql IS 'PL/pgSQL procedural language'`, 0, `comment on extension`, ``}, + {`COPY x FROM STDIN WHERE a = b`, 54580, ``, ``}, {`CREATE AGGREGATE a`, 0, `create aggregate`, ``}, @@ -3224,6 +3227,7 @@ func TestUnimplementedSyntax(t *testing.T) { {`CREATE CONSTRAINT TRIGGER a`, 28296, `create constraint`, ``}, {`CREATE CONVERSION a`, 0, `create conversion`, ``}, {`CREATE DEFAULT CONVERSION a`, 0, `create def conv`, ``}, + {`CREATE EXTENSION IF NOT EXISTS plpgsql WITH SCHEMA pg_catalog`, 0, `create extension if not exists with`, ``}, {`CREATE FOREIGN DATA WRAPPER a`, 0, `create fdw`, ``}, {`CREATE FOREIGN TABLE a`, 0, `create foreign table`, ``}, {`CREATE FUNCTION a`, 17511, `create`, ``}, @@ -3262,6 +3266,9 @@ func TestUnimplementedSyntax(t *testing.T) { {`DISCARD TEMP`, 0, `discard temp`, ``}, {`DISCARD TEMPORARY`, 0, `discard temp`, ``}, + {`GRANT ALL ON SEQUENCE`, 0, `grant privileges on sequence`, ``}, + {`REVOKE ALL ON SEQUENCE`, 0, `revoke privileges on sequence`, ``}, + {`SET CONSTRAINTS foo`, 0, `set constraints`, ``}, {`SET LOCAL foo = bar`, 32562, ``, ``}, {`SET foo FROM CURRENT`, 0, `set from current`, ``}, diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index 58b048f3d7ee..08f5281eb8d5 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -70,6 +70,8 @@ func unimplementedWithIssueDetail(sqllex sqlLexer, issue int, detail string) int sqllex.(*lexer).UnimplementedWithIssueDetail(issue, detail) return 1 } + + %} %{ @@ -729,6 +731,7 @@ func (u *sqlSymUnion) objectNamePrefixList() tree.ObjectNamePrefixList { %type stmt_block %type stmt + %type alter_stmt %type alter_ddl_stmt %type alter_table_stmt @@ -741,6 +744,7 @@ func (u *sqlSymUnion) objectNamePrefixList() tree.ObjectNamePrefixList { %type alter_role_stmt %type alter_type_stmt %type alter_schema_stmt +%type alter_unsupported_stmt // ALTER RANGE %type alter_zone_range_stmt @@ -1332,6 +1336,7 @@ stmt: alter_stmt: alter_ddl_stmt // help texts in sub-rule | alter_role_stmt // EXTEND WITH HELP: ALTER ROLE +| alter_unsupported_stmt | ALTER error // SHOW HELP: ALTER alter_ddl_stmt: @@ -2732,6 +2737,13 @@ import_format: $$ = strings.ToUpper($1) } +alter_unsupported_stmt: + ALTER FUNCTION error + { + return unimplemented(sqllex, "alter function") + } + + // %Help: IMPORT - load data from file in a distributed manner // %Category: CCL // %Text: @@ -3099,6 +3111,7 @@ comment_stmt: { $$.val = &tree.CommentOnIndex{Index: $4.tableIndexName(), Comment: $6.strPtr()} } +| COMMENT ON EXTENSION error { return unimplemented(sqllex, "comment on extension") } comment_text: SCONST @@ -3140,6 +3153,7 @@ create_extension_stmt: | CREATE EXTENSION name { $$.val = &tree.CreateExtension{Name: $3} } +| CREATE EXTENSION IF NOT EXISTS name WITH error { return unimplemented(sqllex, "create extension if not exists with") } | CREATE EXTENSION error // SHOW HELP: CREATE EXTENSION create_unsupported: @@ -3161,7 +3175,7 @@ create_unsupported: | CREATE SUBSCRIPTION error { return unimplemented(sqllex, "create subscription") } | CREATE TABLESPACE error { return unimplementedWithIssueDetail(sqllex, 54113, "create tablespace") } | CREATE TEXT error { return unimplementedWithIssueDetail(sqllex, 7821, "create text") } -| CREATE TRIGGER error { return unimplementedWithIssueDetail(sqllex, 28296, "create") } +| CREATE TRIGGER error { return unimplementedWithIssueDetail(sqllex, 28296, "create trigger") } opt_or_replace: OR REPLACE {} @@ -3964,6 +3978,10 @@ grant_stmt: Grantees: $7.nameList(), } } +| GRANT privileges ON SEQUENCE error + { + return unimplemented(sqllex, "grant privileges on sequence") + } | GRANT error // SHOW HELP: GRANT // %Help: REVOKE - remove access privileges and role memberships @@ -4011,6 +4029,10 @@ revoke_stmt: Grantees: $7.nameList(), } } +| REVOKE privileges ON SEQUENCE error + { + return unimplemented(sqllex, "revoke privileges on sequence") + } | REVOKE error // SHOW HELP: REVOKE // ALL can either be by itself, or with the optional PRIVILEGES keyword (which no-ops) diff --git a/pkg/sql/region_util.go b/pkg/sql/region_util.go index bd68ccf20d53..2a27063ee4a7 100644 --- a/pkg/sql/region_util.go +++ b/pkg/sql/region_util.go @@ -454,11 +454,10 @@ func applyZoneConfigForMultiRegionTableOptionNewIndexes( } } -// isRegionalByRowPlaceholderZoneConfig returns whether a given zone config -// should be marked as a placeholder config if the zone config belongs to -// a REGIONAL BY ROW table. +// isPlaceholderZoneConfigForMultiRegion returns whether a given zone config +// should be marked as a placeholder config for a multi-region object. // See zonepb.IsSubzonePlaceholder for why this is necessary. -func isRegionalByRowPlaceholderZoneConfig(zc zonepb.ZoneConfig) bool { +func isPlaceholderZoneConfigForMultiRegion(zc zonepb.ZoneConfig) bool { // Strip Subzones / SubzoneSpans, as these may contain items if migrating // from one REGIONAL BY ROW table to another. strippedZC := zc @@ -484,11 +483,6 @@ func applyZoneConfigForMultiRegionTableOptionTableNewConfig( return false, zonepb.ZoneConfig{}, err } zc.CopyFromZone(*localityZoneConfig, multiRegionZoneConfigFields) - if newConfig.GetRegionalByRow() != nil { - if isRegionalByRowPlaceholderZoneConfig(zc) { - zc.NumReplicas = proto.Int32(0) - } - } return false, zc, nil } } @@ -513,10 +507,6 @@ var ApplyZoneConfigForMultiRegionTableOptionTableAndIndexes = func( hasNewSubzones := table.IsLocalityRegionalByRow() if hasNewSubzones { - if isRegionalByRowPlaceholderZoneConfig(zc) { - zc.NumReplicas = proto.Int32(0) - } - for _, region := range regionConfig.Regions { subzoneConfig, err := zoneConfigForMultiRegionPartition(region, regionConfig) if err != nil { @@ -568,6 +558,14 @@ func ApplyZoneConfigForMultiRegionTable( zoneConfig = newZoneConfig } + // Mark the NumReplicas as 0 if we have subzones but no other features + // in the zone config. This signifies a placeholder. + // Note we do not use hasNewSubzones here as there may be existing subzones + // on the zone config which may still be a placeholder. + if len(zoneConfig.Subzones) > 0 && isPlaceholderZoneConfigForMultiRegion(zoneConfig) { + zoneConfig.NumReplicas = proto.Int32(0) + } + if !zoneConfig.Equal(zonepb.NewZoneConfig()) { if err := zoneConfig.Validate(); err != nil { return pgerror.Newf( diff --git a/pkg/sql/sem/tree/BUILD.bazel b/pkg/sql/sem/tree/BUILD.bazel index 3b161e24344a..06cd229f69ac 100644 --- a/pkg/sql/sem/tree/BUILD.bazel +++ b/pkg/sql/sem/tree/BUILD.bazel @@ -88,6 +88,7 @@ go_library( "type_check.go", "type_name.go", "union.go", + "unsupported_error.go", "update.go", "values.go", "var_name.go", diff --git a/pkg/sql/sem/tree/unsupported_error.go b/pkg/sql/sem/tree/unsupported_error.go new file mode 100644 index 000000000000..ac91e1efc7fd --- /dev/null +++ b/pkg/sql/sem/tree/unsupported_error.go @@ -0,0 +1,31 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tree + +var _ error = &UnsupportedError{} + +// UnsupportedError is an error object which is returned by some unimplemented SQL +// statements. It is currently only used to skip over PGDUMP statements during +// an import. +type UnsupportedError struct { + Err error + FeatureName string +} + +func (u *UnsupportedError) Error() string { + return u.Err.Error() +} + +// Cause implements causer. +func (u *UnsupportedError) Cause() error { return u.Err } + +// Unwrap implements wrapper. +func (u *UnsupportedError) Unwrap() error { return u.Err } diff --git a/pkg/storage/intent_interleaving_iter.go b/pkg/storage/intent_interleaving_iter.go index 68291c11b52b..d9d65ee57fa9 100644 --- a/pkg/storage/intent_interleaving_iter.go +++ b/pkg/storage/intent_interleaving_iter.go @@ -13,6 +13,7 @@ package storage import ( "bytes" "fmt" + "math/rand" "sync" "github.com/cockroachdb/cockroach/pkg/keys" @@ -865,3 +866,72 @@ func newMVCCIteratorByCloningEngineIter(iter EngineIterator, opts IterOptions) M } return it } + +// unsageMVCCIterator is used in RaceEnabled test builds to randomly inject +// changes to unsafe keys retrieved from MVCCIterators. +type unsafeMVCCIterator struct { + MVCCIterator + keyBuf []byte + rawKeyBuf []byte + rawMVCCKeyBuf []byte +} + +func wrapInUnsafeIter(iter MVCCIterator) MVCCIterator { + return &unsafeMVCCIterator{MVCCIterator: iter} +} + +var _ MVCCIterator = &unsafeMVCCIterator{} + +func (i *unsafeMVCCIterator) SeekGE(key MVCCKey) { + i.mangleBufs() + i.MVCCIterator.SeekGE(key) +} + +func (i *unsafeMVCCIterator) Next() { + i.mangleBufs() + i.MVCCIterator.Next() +} + +func (i *unsafeMVCCIterator) NextKey() { + i.mangleBufs() + i.MVCCIterator.NextKey() +} + +func (i *unsafeMVCCIterator) SeekLT(key MVCCKey) { + i.mangleBufs() + i.MVCCIterator.SeekLT(key) +} + +func (i *unsafeMVCCIterator) Prev() { + i.mangleBufs() + i.MVCCIterator.Prev() +} + +func (i *unsafeMVCCIterator) UnsafeKey() MVCCKey { + rv := i.MVCCIterator.UnsafeKey() + i.keyBuf = append(i.keyBuf[:0], rv.Key...) + rv.Key = i.keyBuf + return rv +} + +func (i *unsafeMVCCIterator) UnsafeRawKey() []byte { + rv := i.MVCCIterator.UnsafeRawKey() + i.rawKeyBuf = append(i.rawKeyBuf[:0], rv...) + return i.rawKeyBuf +} + +func (i *unsafeMVCCIterator) UnsafeRawMVCCKey() []byte { + rv := i.MVCCIterator.UnsafeRawMVCCKey() + i.rawMVCCKeyBuf = append(i.rawMVCCKeyBuf[:0], rv...) + return i.rawMVCCKeyBuf +} + +func (i *unsafeMVCCIterator) mangleBufs() { + if rand.Intn(2) == 0 { + for _, b := range [3][]byte{i.keyBuf, i.rawKeyBuf, i.rawMVCCKeyBuf} { + for i := range b { + b[i] = 0 + } + } + } +} diff --git a/pkg/storage/intent_interleaving_iter_test.go b/pkg/storage/intent_interleaving_iter_test.go index b5caafea0c77..47e2896c77a7 100644 --- a/pkg/storage/intent_interleaving_iter_test.go +++ b/pkg/storage/intent_interleaving_iter_test.go @@ -312,7 +312,7 @@ func TestIntentInterleavingIter(t *testing.T) { if d.HasArg("prefix") { d.ScanArgs(t, "prefix", &opts.Prefix) } - iter := newIntentInterleavingIterator(eng, opts) + iter := wrapInUnsafeIter(newIntentInterleavingIterator(eng, opts)) var b strings.Builder defer iter.Close() // pos is the original : prefix computed by diff --git a/pkg/storage/metamorphic/operations.go b/pkg/storage/metamorphic/operations.go index 93d22b04974e..9d9e3fb20dfe 100644 --- a/pkg/storage/metamorphic/operations.go +++ b/pkg/storage/metamorphic/operations.go @@ -293,7 +293,7 @@ type mvccClearTimeRangeOp struct { func (m mvccClearTimeRangeOp) run(ctx context.Context) string { writer := m.m.getReadWriter(m.writer) span, err := storage.MVCCClearTimeRange(ctx, writer, &enginepb.MVCCStats{}, m.key, m.endKey, - m.startTime, m.endTime, math.MaxInt64, true /* useTBI */) + m.startTime, m.endTime, math.MaxInt64, math.MaxInt64, true /* useTBI */) if err != nil { return fmt.Sprintf("error: %s", err) } diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index 7fd118e6cc3f..61819d3ff43f 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -2076,6 +2076,11 @@ func MVCCMerge( // buffer of keys selected for deletion but not yet flushed (as done to detect // long runs for cleaning in a single ClearRange). // +// Limiting the number of keys or ranges of keys processed can still cause a +// batch that is too large -- in number of bytes -- for raft to replicate if the +// keys are very large. So if the total length of the keys or key spans cleared +// exceeds maxBatchByteSize it will also stop and return a resume span. +// // This function handles the stats computations to determine the correct // incremental deltas of clearing these keys (and correctly determining if it // does or not not change the live and gc keys). @@ -2088,10 +2093,11 @@ func MVCCClearTimeRange( ms *enginepb.MVCCStats, key, endKey roachpb.Key, startTime, endTime hlc.Timestamp, - maxBatchSize int64, + maxBatchSize, maxBatchByteSize int64, useTBI bool, ) (*roachpb.Span, error) { var batchSize int64 + var batchByteSize int64 var resume *roachpb.Span // When iterating, instead of immediately clearing a matching key we can @@ -2115,7 +2121,6 @@ func MVCCClearTimeRange( "MVCCStats passed in to MVCCClearTimeRange must be non-nil to ensure proper stats" + " computation during Clear operations") } - clearMatchingKey := func(k MVCCKey) { if len(clearRangeStart.Key) == 0 { // Currently buffering keys to clear one-by-one. @@ -2138,23 +2143,40 @@ func MVCCClearTimeRange( if err := rw.ClearMVCCRange(clearRangeStart, nonMatch); err != nil { return err } + batchByteSize += int64(clearRangeStart.EncodedSize() + nonMatch.EncodedSize()) batchSize++ clearRangeStart = MVCCKey{} } else if bufSize > 0 { + var encodedBufSize int64 for i := 0; i < bufSize; i++ { - if buf[i].Timestamp.IsEmpty() { - // Inline metadata. Not an intent because iteration below fails - // if it sees an intent. - if err := rw.ClearUnversioned(buf[i].Key); err != nil { - return err - } - } else { - if err := rw.ClearMVCC(buf[i]); err != nil { - return err + encodedBufSize += int64(buf[i].EncodedSize()) + } + // Even though we didn't get a large enough number of keys to switch to + // clearrange, the byte size of the keys we did get is now too large to + // encode them all within the byte size limit, so use clearrange anyway. + if batchByteSize+encodedBufSize >= maxBatchByteSize { + if err := rw.ClearMVCCRange(buf[0], nonMatch); err != nil { + return err + } + batchByteSize += int64(buf[0].EncodedSize() + nonMatch.EncodedSize()) + batchSize++ + } else { + for i := 0; i < bufSize; i++ { + if buf[i].Timestamp.IsEmpty() { + // Inline metadata. Not an intent because iteration below fails + // if it sees an intent. + if err := rw.ClearUnversioned(buf[i].Key); err != nil { + return err + } + } else { + if err := rw.ClearMVCC(buf[i]); err != nil { + return err + } } } + batchByteSize += encodedBufSize + batchSize += int64(bufSize) } - batchSize += int64(bufSize) bufSize = 0 } return nil @@ -2224,6 +2246,10 @@ func MVCCClearTimeRange( resume = &roachpb.Span{Key: append([]byte{}, k.Key...), EndKey: endKey} break } + if batchByteSize > maxBatchByteSize { + resume = &roachpb.Span{Key: append([]byte{}, k.Key...), EndKey: endKey} + break + } clearMatchingKey(k) clearedMetaKey.Key = append(clearedMetaKey.Key[:0], k.Key...) clearedMeta.KeyBytes = MVCCVersionTimestampSize diff --git a/pkg/storage/mvcc_test.go b/pkg/storage/mvcc_test.go index 1afba5c1a32f..41edc781bcf0 100644 --- a/pkg/storage/mvcc_test.go +++ b/pkg/storage/mvcc_test.go @@ -2127,12 +2127,35 @@ func TestMVCCClearTimeRange(t *testing.T) { require.Equal(t, expected, res.KVs) } + const kb = 1024 + + resumingClear := func( + t *testing.T, + ctx context.Context, + rw ReadWriter, + ms *enginepb.MVCCStats, + key, endKey roachpb.Key, + ts, endTs hlc.Timestamp, + sz int64, + byteLimit int64, + useTBI bool, + ) int { + resume, err := MVCCClearTimeRange(ctx, rw, ms, key, endKey, ts, endTs, sz, byteLimit, useTBI) + require.NoError(t, err) + attempts := 1 + for resume != nil { + resume, err = MVCCClearTimeRange(ctx, rw, ms, resume.Key, resume.EndKey, ts, endTs, sz, byteLimit, useTBI) + require.NoError(t, err) + attempts++ + } + return attempts + } for _, useTBI := range []bool{true, false} { t.Run(fmt.Sprintf("useTBI-%t", useTBI), func(t *testing.T) { t.Run("clear > ts0", func(t *testing.T) { e := setupKVs(t) defer e.Close() - _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts0, ts5, 10, + _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts0, ts5, 10, 1<<10, useTBI) require.NoError(t, err) assertKVs(t, e, ts0, ts0Content) @@ -2143,9 +2166,29 @@ func TestMVCCClearTimeRange(t *testing.T) { t.Run("clear > ts1 ", func(t *testing.T) { e := setupKVs(t) defer e.Close() - _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts1, ts5, 10, + attempts := resumingClear(t, ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts1, ts5, 10, kb, useTBI) + require.Equal(t, 1, attempts) + assertKVs(t, e, ts1, ts1Content) + assertKVs(t, e, ts2, ts1Content) + assertKVs(t, e, ts5, ts1Content) + }) + t.Run("clear > ts1 count-size batch", func(t *testing.T) { + e := setupKVs(t) + defer e.Close() + attempts := resumingClear(t, ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts1, ts5, 1, kb, useTBI) - require.NoError(t, err) + require.Equal(t, 2, attempts) + assertKVs(t, e, ts1, ts1Content) + assertKVs(t, e, ts2, ts1Content) + assertKVs(t, e, ts5, ts1Content) + }) + + t.Run("clear > ts1 byte-size batch", func(t *testing.T) { + e := setupKVs(t) + defer e.Close() + attempts := resumingClear(t, ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts1, ts5, 10, 1, + useTBI) + require.Equal(t, 2, attempts) assertKVs(t, e, ts1, ts1Content) assertKVs(t, e, ts2, ts1Content) assertKVs(t, e, ts5, ts1Content) @@ -2154,9 +2197,9 @@ func TestMVCCClearTimeRange(t *testing.T) { t.Run("clear > ts2", func(t *testing.T) { e := setupKVs(t) defer e.Close() - _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts2, ts5, 10, + attempts := resumingClear(t, ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts2, ts5, 10, kb, useTBI) - require.NoError(t, err) + require.Equal(t, 1, attempts) assertKVs(t, e, ts2, ts2Content) assertKVs(t, e, ts5, ts2Content) }) @@ -2164,9 +2207,8 @@ func TestMVCCClearTimeRange(t *testing.T) { t.Run("clear > ts3", func(t *testing.T) { e := setupKVs(t) defer e.Close() - _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts3, ts5, 10, + resumingClear(t, ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts3, ts5, 10, kb, useTBI) - require.NoError(t, err) assertKVs(t, e, ts3, ts3Content) assertKVs(t, e, ts5, ts3Content) }) @@ -2174,7 +2216,7 @@ func TestMVCCClearTimeRange(t *testing.T) { t.Run("clear > ts4 (nothing) ", func(t *testing.T) { e := setupKVs(t) defer e.Close() - _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts4, ts5, 10, + _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts4, ts5, 10, kb, useTBI) require.NoError(t, err) assertKVs(t, e, ts4, ts4Content) @@ -2184,7 +2226,7 @@ func TestMVCCClearTimeRange(t *testing.T) { t.Run("clear > ts5 (nothing)", func(t *testing.T) { e := setupKVs(t) defer e.Close() - _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts5, ts5, 10, + _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts5, ts5, 10, kb, useTBI) require.NoError(t, err) assertKVs(t, e, ts4, ts4Content) @@ -2194,9 +2236,8 @@ func TestMVCCClearTimeRange(t *testing.T) { t.Run("clear up to k5 to ts0", func(t *testing.T) { e := setupKVs(t) defer e.Close() - _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, testKey1, testKey5, ts0, ts5, 10, + resumingClear(t, ctx, e, &enginepb.MVCCStats{}, testKey1, testKey5, ts0, ts5, 10, kb, useTBI) - require.NoError(t, err) assertKVs(t, e, ts2, []roachpb.KeyValue{{Key: testKey5, Value: v2}}) assertKVs(t, e, ts5, []roachpb.KeyValue{{Key: testKey5, Value: v4}}) }) @@ -2204,7 +2245,7 @@ func TestMVCCClearTimeRange(t *testing.T) { t.Run("clear > ts0 in empty span (nothing)", func(t *testing.T) { e := setupKVs(t) defer e.Close() - _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, testKey3, testKey5, ts0, ts5, 10, + _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, testKey3, testKey5, ts0, ts5, 10, kb, useTBI) require.NoError(t, err) assertKVs(t, e, ts2, ts2Content) @@ -2214,7 +2255,7 @@ func TestMVCCClearTimeRange(t *testing.T) { t.Run("clear > ts0 in empty span [k3,k5) (nothing)", func(t *testing.T) { e := setupKVs(t) defer e.Close() - _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, testKey3, testKey5, ts0, ts5, 10, + _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, testKey3, testKey5, ts0, ts5, 10, 1<<10, useTBI) require.NoError(t, err) assertKVs(t, e, ts2, ts2Content) @@ -2224,7 +2265,7 @@ func TestMVCCClearTimeRange(t *testing.T) { t.Run("clear k3 and up in ts0 > x >= ts1 (nothing)", func(t *testing.T) { e := setupKVs(t) defer e.Close() - _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, testKey3, keyMax, ts0, ts1, 10, + _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, testKey3, keyMax, ts0, ts1, 10, 1<<10, useTBI) require.NoError(t, err) assertKVs(t, e, ts2, ts2Content) @@ -2241,7 +2282,7 @@ func TestMVCCClearTimeRange(t *testing.T) { t.Run("clear everything hitting intent fails", func(t *testing.T) { e := setupKVsWithIntent(t) defer e.Close() - _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts0, ts5, 10, + _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts0, ts5, 10, 1<<10, useTBI) require.EqualError(t, err, "conflicting intents on \"/db3\"") }) @@ -2249,7 +2290,7 @@ func TestMVCCClearTimeRange(t *testing.T) { t.Run("clear exactly hitting intent fails", func(t *testing.T) { e := setupKVsWithIntent(t) defer e.Close() - _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, testKey3, testKey4, ts2, ts3, 10, + _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, testKey3, testKey4, ts2, ts3, 10, 1<<10, useTBI) require.EqualError(t, err, "conflicting intents on \"/db3\"") }) @@ -2257,9 +2298,8 @@ func TestMVCCClearTimeRange(t *testing.T) { t.Run("clear everything above intent", func(t *testing.T) { e := setupKVsWithIntent(t) defer e.Close() - _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts3, ts5, 10, + resumingClear(t, ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts3, ts5, 10, kb, useTBI) - require.NoError(t, err) assertKVs(t, e, ts2, ts2Content) // Scan (< k3 to avoid intent) to confirm that k2 was indeed reverted to @@ -2283,9 +2323,8 @@ func TestMVCCClearTimeRange(t *testing.T) { e := setupKVsWithIntent(t) defer e.Close() assertKVs(t, e, ts2, ts2Content) - _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts1, ts2, 10, + resumingClear(t, ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts1, ts2, 10, kb, useTBI) - require.NoError(t, err) assertKVs(t, e, ts2, ts1Content) }) }) @@ -2387,7 +2426,11 @@ func TestMVCCClearTimeRangeOnRandomData(t *testing.T) { } reverts[0] = swathTime - 1 sort.Ints(reverts) - + const byteLimit = 1000 + const keyLimit = 100 + keyLen := int64(len(roachpb.Key(fmt.Sprintf("%05d", 1)))) + MVCCVersionTimestampSize + maxAttempts := (numKVs * keyLen) / byteLimit + var attempts int64 for i := len(reverts) - 1; i >= 0; i-- { for _, useTBI := range []bool{false, true} { t.Run(fmt.Sprintf("useTBI-%t revert-%d", useTBI, i), func(t *testing.T) { @@ -2399,8 +2442,9 @@ func TestMVCCClearTimeRangeOnRandomData(t *testing.T) { // Revert to the revert time. startKey := localMax for { + attempts++ resume, err := MVCCClearTimeRange(ctx, e, &ms, startKey, keyMax, revertTo, now, - 100, useTBI) + keyLimit, byteLimit, useTBI) require.NoError(t, err) if resume == nil { break @@ -2417,6 +2461,7 @@ func TestMVCCClearTimeRangeOnRandomData(t *testing.T) { }) } } + require.LessOrEqual(t, attempts, maxAttempts) }) } } diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index 995ffd561316..358313330404 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/storage/fs" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -720,12 +721,18 @@ func (p *Pebble) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions) MVCCIt // Doing defer r.Free() does not inline. iter := r.NewMVCCIterator(iterKind, opts) r.Free() + if util.RaceEnabled { + iter = wrapInUnsafeIter(iter) + } return iter } - iter := newPebbleIterator(p.db, nil, opts) + iter := MVCCIterator(newPebbleIterator(p.db, nil, opts)) if iter == nil { panic("couldn't create a new iterator") } + if util.RaceEnabled { + iter = wrapInUnsafeIter(iter) + } return iter } @@ -1339,12 +1346,19 @@ func (p *pebbleReadOnly) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions // Doing defer r.Free() does not inline. iter := r.NewMVCCIterator(iterKind, opts) r.Free() + if util.RaceEnabled { + iter = wrapInUnsafeIter(iter) + } return iter } if !opts.MinTimestampHint.IsEmpty() { // MVCCIterators that specify timestamp bounds cannot be cached. - return newPebbleIterator(p.parent.db, nil, opts) + iter := MVCCIterator(newPebbleIterator(p.parent.db, nil, opts)) + if util.RaceEnabled { + iter = wrapInUnsafeIter(iter) + } + return iter } iter := &p.normalIter @@ -1369,7 +1383,11 @@ func (p *pebbleReadOnly) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions } iter.inuse = true - return iter + var rv MVCCIterator = iter + if util.RaceEnabled { + rv = wrapInUnsafeIter(rv) + } + return rv } // NewEngineIterator implements the Engine interface. @@ -1582,9 +1600,16 @@ func (p *pebbleSnapshot) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions // Doing defer r.Free() does not inline. iter := r.NewMVCCIterator(iterKind, opts) r.Free() + if util.RaceEnabled { + iter = wrapInUnsafeIter(iter) + } return iter } - return newPebbleIterator(p.snapshot, nil, opts) + iter := MVCCIterator(newPebbleIterator(p.snapshot, nil, opts)) + if util.RaceEnabled { + iter = wrapInUnsafeIter(iter) + } + return iter } // NewEngineIterator implements the Reader interface. diff --git a/pkg/storage/pebble_batch.go b/pkg/storage/pebble_batch.go index c9beb2734d48..3e73b3d6256e 100644 --- a/pkg/storage/pebble_batch.go +++ b/pkg/storage/pebble_batch.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" @@ -206,12 +207,19 @@ func (p *pebbleBatch) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions) M // Doing defer r.Free() does not inline. iter := r.NewMVCCIterator(iterKind, opts) r.Free() + if util.RaceEnabled { + iter = wrapInUnsafeIter(iter) + } return iter } if !opts.MinTimestampHint.IsEmpty() { // MVCCIterators that specify timestamp bounds cannot be cached. - return newPebbleIterator(p.batch, nil, opts) + iter := MVCCIterator(newPebbleIterator(p.batch, nil, opts)) + if util.RaceEnabled { + iter = wrapInUnsafeIter(iter) + } + return iter } iter := &p.normalIter @@ -239,7 +247,11 @@ func (p *pebbleBatch) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions) M } iter.inuse = true - return iter + var rv MVCCIterator = iter + if util.RaceEnabled { + rv = wrapInUnsafeIter(rv) + } + return rv } // NewEngineIterator implements the Batch interface.