Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

importccl: Add DROP TABLE [IF EXISTS] support for import pgdump. #56920

Merged
merged 1 commit into from
Nov 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5334,6 +5334,91 @@ func TestImportPgDumpGeo(t *testing.T) {
})
}

func TestImportPgDumpDropTable(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
baseDir := filepath.Join("testdata")
args := base.TestServerArgs{ExternalIODir: baseDir}
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ServerArgs: args})
defer tc.Stopper().Stop(ctx)
conn := tc.Conns[0]
sqlDB := sqlutils.MakeSQLRunner(conn)

var data string
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
_, _ = w.Write([]byte(data))
}
}))
defer srv.Close()

// If the target table for a DROP exists, we throw an error.
t.Run("table exists", func(t *testing.T) {

// Set up table `t` exists for testing.
sqlDB.Exec(t, `DROP TABLE IF EXISTS t; CREATE TABLE t (a INT);`)

// Import PGDump data which includes DROP TABLE.
data = `DROP TABLE t; CREATE TABLE t (a INT); INSERT INTO t VALUES (4);`
sqlDB.ExpectErr(t, `drop table "t" and then retry the import`, `IMPORT PGDUMP ($1)`, srv.URL)

// Also expect error on existing table with IF EXISTS.
data = `DROP TABLE IF EXISTS t; CREATE TABLE t (a INT); INSERT INTO t VALUES (4);`
sqlDB.ExpectErr(t, `drop table "t" and then retry the import`, `IMPORT PGDUMP ($1)`, srv.URL)

// Cleanup.
sqlDB.Exec(t, `DROP TABLE t`)
})

// If the target table for a DROP does not exist, we ignore the statement.
t.Run("table does not exist", func(t *testing.T) {

// Set up table `t` does not exist for testing.
sqlDB.Exec(t, `DROP TABLE IF EXISTS t;`)

// No error should be thrown with DROP statement.
data = `DROP TABLE t; CREATE TABLE t (a INT); INSERT INTO t VALUES (4);`
expected := [][]string{{"4"}}

sqlDB.Exec(t, `IMPORT PGDUMP ($1)`, srv.URL)
sqlDB.CheckQueryResults(t, `SELECT * FROM t`, expected)

// Drop the table `t` that pgdump imported.
// Now table `t` does not exist for the IF EXISTS example.
sqlDB.Exec(t, `DROP TABLE t;`)

// Also expect no errors and successful import with IF EXISTS.
data = `DROP TABLE IF EXISTS t; CREATE TABLE t (a INT); INSERT INTO t VALUES (4);`
sqlDB.Exec(t, `IMPORT PGDUMP ($1)`, srv.URL)
sqlDB.CheckQueryResults(t, `SELECT * FROM t`, expected)

// Cleanup.
sqlDB.Exec(t, `DROP TABLE t`)
})

t.Run("multiple tables and drops", func(t *testing.T) {
// Set up.
sqlDB.Exec(t, `DROP TABLE IF EXISTS t, u;`)

// Import table `t` successfully.
data = `DROP TABLE t; CREATE TABLE t (a INT)`
sqlDB.Exec(t, `IMPORT PGDUMP ($1)`, srv.URL)

// Table `u` does not exist, so create it successfully.
// Table `t` exists, so an error is thrown for table `t`.
data = `DROP TABLE u;
CREATE TABLE u (a INT);
INSERT INTO u VALUES (55);
DROP TABLE t;`
sqlDB.ExpectErr(t, `drop table "t" and then retry the import`, `IMPORT PGDUMP ($1)`, srv.URL)

// Since the PGDump failed on error, table `u` should not exist.
sqlDB.ExpectErr(t, `does not exist`, `SELECT * FROM u`)
})
}

func TestImportCockroachDump(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
36 changes: 33 additions & 3 deletions pkg/ccl/importccl/read_import_pgdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import (
"regexp"
"strings"

"github.com/cockroachdb/cockroach/pkg/ccl/backupccl"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/sql"
Expand Down Expand Up @@ -309,7 +311,7 @@ func readPostgresCreateTable(
if err != nil {
return nil, errors.Wrap(err, "postgres parse error")
}
if err := readPostgresStmt(ctx, evalCtx, match, fks, createTbl, createSeq, tableFKs, stmt); err != nil {
if err := readPostgresStmt(ctx, evalCtx, match, fks, createTbl, createSeq, tableFKs, stmt, p, parentID); err != nil {
return nil, err
}
}
Expand All @@ -324,6 +326,8 @@ func readPostgresStmt(
createSeq map[string]*tree.CreateSequence,
tableFKs map[string][]*tree.ForeignKeyConstraintTableDef,
stmt interface{},
p sql.JobExecContext,
parentID descpb.ID,
) error {
switch stmt := stmt.(type) {
case *tree.CreateTable:
Expand Down Expand Up @@ -490,7 +494,7 @@ func readPostgresStmt(
for _, fnStmt := range fnStmts {
switch ast := fnStmt.AST.(type) {
case *tree.AlterTable:
if err := readPostgresStmt(ctx, evalCtx, match, fks, createTbl, createSeq, tableFKs, ast); err != nil {
if err := readPostgresStmt(ctx, evalCtx, match, fks, createTbl, createSeq, tableFKs, ast, p, parentID); err != nil {
return err
}
default:
Expand All @@ -505,6 +509,32 @@ func readPostgresStmt(
default:
return errors.Errorf("unsupported %T SELECT: %s", sel, sel)
}
case *tree.DropTable:
names := stmt.Names

// If we find a table with the same name in the target DB we are importing
// into and same public schema, then we throw an error telling the user to
// drop the conflicting existing table to proceed.
// Otherwise, we silently ignore the drop statement and continue with the import.
for _, name := range names {
tableName := name.ToUnresolvedObjectName().String()
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
err := backupccl.CheckObjectExists(
ctx,
txn,
p.ExecCfg().Codec,
parentID,
keys.PublicSchemaID,
tableName,
)
if err != nil {
return errors.Wrapf(err, `drop table "%s" and then retry the import`, tableName)
}
return nil
}); err != nil {
return err
}
}
case *tree.BeginTransaction, *tree.CommitTransaction:
// ignore txns.
case *tree.SetVar, *tree.Insert, *tree.CopyFrom, copyData, *tree.Delete:
Expand Down Expand Up @@ -870,7 +900,7 @@ func (m *pgDumpReader) readFile(
}
case *tree.SetVar, *tree.BeginTransaction, *tree.CommitTransaction, *tree.Analyze:
// ignored.
case *tree.CreateTable, *tree.AlterTable, *tree.CreateIndex, *tree.CreateSequence:
case *tree.CreateTable, *tree.AlterTable, *tree.CreateIndex, *tree.CreateSequence, *tree.DropTable:
// handled during schema extraction.
case *tree.Delete:
switch stmt := i.Table.(type) {
Expand Down