From ea381b0c0b09f674fd753a55c057630de1eb95cb Mon Sep 17 00:00:00 2001 From: Monica Xu Date: Wed, 18 Nov 2020 16:25:54 -0500 Subject: [PATCH] importccl: Add DROP TABLE [IF EXISTS] support for import pgdump. Previously, whenever a DROP TABLE statement was parsed, an error was thrown and the import would fail since DROP TABLE statements were not supported. Now, when we encounter a DROP TABLE statement for a target table foo, if foo exists, then we throw an error indicating to the user to drop the table foo. Otherwise, if foo does not exist, we silently ignore the DROP statement and proceed with the pgdump import. Release note: None --- pkg/ccl/importccl/import_stmt_test.go | 85 +++++++++++++++++++++++++ pkg/ccl/importccl/read_import_pgdump.go | 36 ++++++++++- 2 files changed, 118 insertions(+), 3 deletions(-) diff --git a/pkg/ccl/importccl/import_stmt_test.go b/pkg/ccl/importccl/import_stmt_test.go index adedd9dfb8d5..b0a2f8da5199 100644 --- a/pkg/ccl/importccl/import_stmt_test.go +++ b/pkg/ccl/importccl/import_stmt_test.go @@ -5334,6 +5334,91 @@ func TestImportPgDumpGeo(t *testing.T) { }) } +func TestImportPgDumpDropTable(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + baseDir := filepath.Join("testdata") + args := base.TestServerArgs{ExternalIODir: baseDir} + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ServerArgs: args}) + defer tc.Stopper().Stop(ctx) + conn := tc.Conns[0] + sqlDB := sqlutils.MakeSQLRunner(conn) + + var data string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == "GET" { + _, _ = w.Write([]byte(data)) + } + })) + defer srv.Close() + + // If the target table for a DROP exists, we throw an error. + t.Run("table exists", func(t *testing.T) { + + // Set up table `t` exists for testing. + sqlDB.Exec(t, `DROP TABLE IF EXISTS t; CREATE TABLE t (a INT);`) + + // Import PGDump data which includes DROP TABLE. + data = `DROP TABLE t; CREATE TABLE t (a INT); INSERT INTO t VALUES (4);` + sqlDB.ExpectErr(t, `drop table "t" and then retry the import`, `IMPORT PGDUMP ($1)`, srv.URL) + + // Also expect error on existing table with IF EXISTS. + data = `DROP TABLE IF EXISTS t; CREATE TABLE t (a INT); INSERT INTO t VALUES (4);` + sqlDB.ExpectErr(t, `drop table "t" and then retry the import`, `IMPORT PGDUMP ($1)`, srv.URL) + + // Cleanup. + sqlDB.Exec(t, `DROP TABLE t`) + }) + + // If the target table for a DROP does not exist, we ignore the statement. + t.Run("table does not exist", func(t *testing.T) { + + // Set up table `t` does not exist for testing. + sqlDB.Exec(t, `DROP TABLE IF EXISTS t;`) + + // No error should be thrown with DROP statement. + data = `DROP TABLE t; CREATE TABLE t (a INT); INSERT INTO t VALUES (4);` + expected := [][]string{{"4"}} + + sqlDB.Exec(t, `IMPORT PGDUMP ($1)`, srv.URL) + sqlDB.CheckQueryResults(t, `SELECT * FROM t`, expected) + + // Drop the table `t` that pgdump imported. + // Now table `t` does not exist for the IF EXISTS example. + sqlDB.Exec(t, `DROP TABLE t;`) + + // Also expect no errors and successful import with IF EXISTS. + data = `DROP TABLE IF EXISTS t; CREATE TABLE t (a INT); INSERT INTO t VALUES (4);` + sqlDB.Exec(t, `IMPORT PGDUMP ($1)`, srv.URL) + sqlDB.CheckQueryResults(t, `SELECT * FROM t`, expected) + + // Cleanup. + sqlDB.Exec(t, `DROP TABLE t`) + }) + + t.Run("multiple tables and drops", func(t *testing.T) { + // Set up. + sqlDB.Exec(t, `DROP TABLE IF EXISTS t, u;`) + + // Import table `t` successfully. + data = `DROP TABLE t; CREATE TABLE t (a INT)` + sqlDB.Exec(t, `IMPORT PGDUMP ($1)`, srv.URL) + + // Table `u` does not exist, so create it successfully. + // Table `t` exists, so an error is thrown for table `t`. + data = `DROP TABLE u; + CREATE TABLE u (a INT); + INSERT INTO u VALUES (55); + DROP TABLE t;` + sqlDB.ExpectErr(t, `drop table "t" and then retry the import`, `IMPORT PGDUMP ($1)`, srv.URL) + + // Since the PGDump failed on error, table `u` should not exist. + sqlDB.ExpectErr(t, `does not exist`, `SELECT * FROM u`) + }) +} + func TestImportCockroachDump(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/ccl/importccl/read_import_pgdump.go b/pkg/ccl/importccl/read_import_pgdump.go index 84c698311b88..bf9dbcb60088 100644 --- a/pkg/ccl/importccl/read_import_pgdump.go +++ b/pkg/ccl/importccl/read_import_pgdump.go @@ -15,7 +15,9 @@ import ( "regexp" "strings" + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/sql" @@ -309,7 +311,7 @@ func readPostgresCreateTable( if err != nil { return nil, errors.Wrap(err, "postgres parse error") } - if err := readPostgresStmt(ctx, evalCtx, match, fks, createTbl, createSeq, tableFKs, stmt); err != nil { + if err := readPostgresStmt(ctx, evalCtx, match, fks, createTbl, createSeq, tableFKs, stmt, p, parentID); err != nil { return nil, err } } @@ -324,6 +326,8 @@ func readPostgresStmt( createSeq map[string]*tree.CreateSequence, tableFKs map[string][]*tree.ForeignKeyConstraintTableDef, stmt interface{}, + p sql.JobExecContext, + parentID descpb.ID, ) error { switch stmt := stmt.(type) { case *tree.CreateTable: @@ -490,7 +494,7 @@ func readPostgresStmt( for _, fnStmt := range fnStmts { switch ast := fnStmt.AST.(type) { case *tree.AlterTable: - if err := readPostgresStmt(ctx, evalCtx, match, fks, createTbl, createSeq, tableFKs, ast); err != nil { + if err := readPostgresStmt(ctx, evalCtx, match, fks, createTbl, createSeq, tableFKs, ast, p, parentID); err != nil { return err } default: @@ -505,6 +509,32 @@ func readPostgresStmt( default: return errors.Errorf("unsupported %T SELECT: %s", sel, sel) } + case *tree.DropTable: + names := stmt.Names + + // If we find a table with the same name in the target DB we are importing + // into and same public schema, then we throw an error telling the user to + // drop the conflicting existing table to proceed. + // Otherwise, we silently ignore the drop statement and continue with the import. + for _, name := range names { + tableName := name.ToUnresolvedObjectName().String() + if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + err := backupccl.CheckObjectExists( + ctx, + txn, + p.ExecCfg().Codec, + parentID, + keys.PublicSchemaID, + tableName, + ) + if err != nil { + return errors.Wrapf(err, `drop table "%s" and then retry the import`, tableName) + } + return nil + }); err != nil { + return err + } + } case *tree.BeginTransaction, *tree.CommitTransaction: // ignore txns. case *tree.SetVar, *tree.Insert, *tree.CopyFrom, copyData, *tree.Delete: @@ -870,7 +900,7 @@ func (m *pgDumpReader) readFile( } case *tree.SetVar, *tree.BeginTransaction, *tree.CommitTransaction, *tree.Analyze: // ignored. - case *tree.CreateTable, *tree.AlterTable, *tree.CreateIndex, *tree.CreateSequence: + case *tree.CreateTable, *tree.AlterTable, *tree.CreateIndex, *tree.CreateSequence, *tree.DropTable: // handled during schema extraction. case *tree.Delete: switch stmt := i.Table.(type) {