Skip to content

Commit

Permalink
Merge #50850
Browse files Browse the repository at this point in the history
50850: importccl: add support for SQLFns in pgdump r=mjibson a=mjibson

SQLFns are functions users can call from SQL that create and execute
SQL. AddGeometryColumn, for example, creates an ADD COLUMN statement and
executes it. Add support for these in IMPORT PGDUMP by detecting them,
running them, and feeding them back in to the table statement reader.
    
Release note (sql change): add support for AddGeometryColumn and other
functions that mutate schema in IMPORT PGDUMP.


Co-authored-by: Matt Jibson <[email protected]>
  • Loading branch information
craig[bot] and maddyblue committed Jul 3, 2020
2 parents 112ff53 + f5d918c commit b6e7102
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 68 deletions.
40 changes: 40 additions & 0 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4016,6 +4016,46 @@ func TestImportPgDump(t *testing.T) {
}
}

// TestImportPgDumpGeo tests that a file with SQLFn classes can be
// imported. These are functions like AddGeometryColumn which create and
// execute SQL when called (!). They are, for example, used by shp2pgsql
// (https://manpages.debian.org/stretch/postgis/shp2pgsql.1.en.html).
func TestImportPgDumpGeo(t *testing.T) {
defer leaktest.AfterTest(t)()

const nodes = 1
ctx := context.Background()
baseDir := filepath.Join("testdata", "pgdump")
args := base.TestServerArgs{ExternalIODir: baseDir}
tc := testcluster.StartTestCluster(t, nodes, base.TestClusterArgs{ServerArgs: args})
defer tc.Stopper().Stop(ctx)
conn := tc.Conns[0]
sqlDB := sqlutils.MakeSQLRunner(conn)

// Import geo.sql.
sqlDB.Exec(t, `CREATE DATABASE importdb; SET DATABASE = importdb`)
sqlDB.Exec(t, "IMPORT PGDUMP 'nodelocal://0/geo.sql'")

// Execute geo.sql.
sqlDB.Exec(t, `CREATE DATABASE execdb; SET DATABASE = execdb`)
geoSQL, err := ioutil.ReadFile(filepath.Join(baseDir, "geo.sql"))
if err != nil {
t.Fatal(err)
}
sqlDB.Exec(t, string(geoSQL))

// Verify both created tables are identical.
importCreate := sqlDB.QueryStr(t, "SELECT create_statement FROM [SHOW CREATE importdb.nyc_census_blocks]")
// Families are slightly different due to the geom column being last
// in exec and rowid being last in import, so swap that in import to
// match exec.
importCreate[0][0] = strings.Replace(importCreate[0][0], "geom, rowid", "rowid, geom", 1)
sqlDB.CheckQueryResults(t, "SELECT create_statement FROM [SHOW CREATE execdb.nyc_census_blocks]", importCreate)

importSelect := sqlDB.QueryStr(t, "SELECT * FROM importdb.nyc_census_blocks ORDER BY PRIMARY KEY importdb.nyc_census_blocks")
sqlDB.CheckQueryResults(t, "SELECT * FROM execdb.nyc_census_blocks ORDER BY PRIMARY KEY execdb.nyc_census_blocks", importSelect)
}

func TestImportCockroachDump(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down
213 changes: 145 additions & 68 deletions pkg/ccl/importccl/read_import_pgdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,83 +299,160 @@ func readPostgresCreateTable(
if err != nil {
return nil, errors.Wrap(err, "postgres parse error")
}
switch stmt := stmt.(type) {
case *tree.CreateTable:
name, err := getTableName(&stmt.Table)
if err != nil {
return nil, err
}
if match != "" && match != name {
createTbl[name] = nil
} else {
createTbl[name] = stmt
}
case *tree.CreateIndex:
name, err := getTableName(&stmt.Table)
if err != nil {
return nil, err
}
create := createTbl[name]
if create == nil {
break
}
var idx tree.TableDef = &tree.IndexTableDef{
Name: stmt.Name,
Columns: stmt.Columns,
Storing: stmt.Storing,
Inverted: stmt.Inverted,
Interleave: stmt.Interleave,
PartitionBy: stmt.PartitionBy,
}
if stmt.Unique {
idx = &tree.UniqueConstraintTableDef{IndexTableDef: *idx.(*tree.IndexTableDef)}
}
create.Defs = append(create.Defs, idx)
case *tree.AlterTable:
name, err := getTableName2(stmt.Table)
if err != nil {
return nil, err
}
create := createTbl[name]
if create == nil {
break
if err := readPostgresStmt(ctx, evalCtx, match, fks, createTbl, createSeq, tableFKs, stmt); err != nil {
return nil, err
}
}
}

func readPostgresStmt(
ctx context.Context,
evalCtx *tree.EvalContext,
match string,
fks fkHandler,
createTbl map[string]*tree.CreateTable,
createSeq map[string]*tree.CreateSequence,
tableFKs map[string][]*tree.ForeignKeyConstraintTableDef,
stmt interface{},
) error {
switch stmt := stmt.(type) {
case *tree.CreateTable:
name, err := getTableName(&stmt.Table)
if err != nil {
return err
}
if match != "" && match != name {
createTbl[name] = nil
} else {
createTbl[name] = stmt
}
case *tree.CreateIndex:
name, err := getTableName(&stmt.Table)
if err != nil {
return err
}
create := createTbl[name]
if create == nil {
break
}
var idx tree.TableDef = &tree.IndexTableDef{
Name: stmt.Name,
Columns: stmt.Columns,
Storing: stmt.Storing,
Inverted: stmt.Inverted,
Interleave: stmt.Interleave,
PartitionBy: stmt.PartitionBy,
}
if stmt.Unique {
idx = &tree.UniqueConstraintTableDef{IndexTableDef: *idx.(*tree.IndexTableDef)}
}
create.Defs = append(create.Defs, idx)
case *tree.AlterTable:
name, err := getTableName2(stmt.Table)
if err != nil {
return err
}
create := createTbl[name]
if create == nil {
break
}
for _, cmd := range stmt.Cmds {
switch cmd := cmd.(type) {
case *tree.AlterTableAddConstraint:
switch con := cmd.ConstraintDef.(type) {
case *tree.ForeignKeyConstraintTableDef:
if !fks.skip {
tableFKs[name] = append(tableFKs[name], con)
}
default:
create.Defs = append(create.Defs, cmd.ConstraintDef)
}
case *tree.AlterTableSetDefault:
for i, def := range create.Defs {
def, ok := def.(*tree.ColumnTableDef)
if !ok || def.Name != cmd.Column {
continue
}
def.DefaultExpr.Expr = cmd.Default
create.Defs[i] = def
}
case *tree.AlterTableAddColumn:
if cmd.IfNotExists {
return errors.Errorf("unsupported statement: %s", stmt)
}
create.Defs = append(create.Defs, cmd.ColumnDef)
case *tree.AlterTableValidateConstraint:
// ignore
default:
return errors.Errorf("unsupported statement: %s", stmt)
}
for _, cmd := range stmt.Cmds {
switch cmd := cmd.(type) {
case *tree.AlterTableAddConstraint:
switch con := cmd.ConstraintDef.(type) {
case *tree.ForeignKeyConstraintTableDef:
if !fks.skip {
tableFKs[name] = append(tableFKs[name], con)
}
case *tree.CreateSequence:
name, err := getTableName(&stmt.Name)
if err != nil {
return err
}
if match == "" || match == name {
createSeq[name] = stmt
}
// Some SELECT statements mutate schema. Search for those here. If it is not exactly a SELECT that mutates
// schema, ignore it.
case *tree.Select:
switch sel := stmt.Select.(type) {
case *tree.SelectClause:
for _, selExpr := range sel.Exprs {
switch expr := selExpr.Expr.(type) {
case *tree.FuncExpr:
// Look for function calls that mutate schema (this is actually a thing).
semaCtx := tree.MakeSemaContext()
if _, err := expr.TypeCheck(ctx, &semaCtx, nil /* desired */); err != nil {
return err
}
ov := expr.ResolvedOverload()
// Search for a SQLFn, which returns a SQL string to execute.
fn := ov.SQLFn
if fn == nil {
// This is some other function type, which we don't care about.
continue
}
// Attempt to convert all func exprs to datums.
datums := make(tree.Datums, len(expr.Exprs))
for i, ex := range expr.Exprs {
d, ok := ex.(tree.Datum)
if !ok {
// We got something that wasn't a datum so we can't call the
// overload. Since this is a SQLFn and the user would have
// expected us to execute it, we have to error.
return errors.Errorf("unsupported statement: %s", stmt)
}
default:
create.Defs = append(create.Defs, cmd.ConstraintDef)
datums[i] = d
}
case *tree.AlterTableSetDefault:
for i, def := range create.Defs {
def, ok := def.(*tree.ColumnTableDef)
if !ok || def.Name != cmd.Column {
continue
// Now that we have all of the datums, we can execute the overload.
fnSQL, err := fn(evalCtx, datums)
if err != nil {
return err
}
// We have some sql. Parse and process it.
fnStmts, err := parser.Parse(fnSQL)
if err != nil {
return err
}
for _, fnStmt := range fnStmts {
switch ast := fnStmt.AST.(type) {
case *tree.AlterTable:
if err := readPostgresStmt(ctx, evalCtx, match, fks, createTbl, createSeq, tableFKs, ast); err != nil {
return err
}
default:
// We only support ALTER statements returned from a SQLFn.
return errors.Errorf("unsupported statement: %s", stmt)
}
def.DefaultExpr.Expr = cmd.Default
create.Defs[i] = def
}
case *tree.AlterTableValidateConstraint:
// ignore
default:
return nil, errors.Errorf("unsupported statement: %s", stmt)
}
}
case *tree.CreateSequence:
name, err := getTableName(&stmt.Name)
if err != nil {
return nil, err
}
if match == "" || match == name {
createSeq[name] = stmt
}
}
}
return nil
}

func getTableName(tn *tree.TableName) (string, error) {
Expand Down
22 changes: 22 additions & 0 deletions pkg/ccl/importccl/testdata/pgdump/geo.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
-- The two comments below removing gid are there because IMPORT doesn't
-- support DEFAULT functions (#48253). This function is otherwise exactly
-- what shp2pgsql produces.

SET CLIENT_ENCODING TO UTF8;
SET STANDARD_CONFORMING_STRINGS TO ON;
BEGIN;
CREATE TABLE "nyc_census_blocks" (--gid serial,
"blkid" varchar(15),
"popn_total" float8,
"popn_white" float8,
"popn_black" float8,
"popn_nativ" float8,
"popn_asian" float8,
"popn_other" float8,
"boroname" varchar(32));
--ALTER TABLE "nyc_census_blocks" ADD PRIMARY KEY (gid);
SELECT AddGeometryColumn('','nyc_census_blocks','geom','26918','MULTIPOLYGON',2);
INSERT INTO "nyc_census_blocks" ("blkid","popn_total","popn_white","popn_black","popn_nativ","popn_asian","popn_other","boroname",geom) VALUES ('360850009001000','97','51','32','1','5','8','Staten Island','010600002026690000010000000103000000010000000A00000051AC161881A22141A31409CF1F2A51415F4321458DA2214100102A3F1D2A51418C34807C0BA221414E3E89F5122A5141782D605495A12141780D1CE92A2A51410D1C9C6770A121410F2D6074322A5141441560E0B0A02141A00099C72F2A51412365B4789AA021419F60A7BB342A514160E3E8FA66A0214118B4C0CE402A5141EA4BF3EEC7A12141A3023D61452A514151AC161881A22141A31409CF1F2A5141');
INSERT INTO "nyc_census_blocks" ("blkid","popn_total","popn_white","popn_black","popn_nativ","popn_asian","popn_other","boroname",geom) VALUES ('360850020011000','66','52','2','0','7','5','Staten Island','0106000020266900000100000001030000000100000007000000083B4A6F79A8214127EC57B49926514151B51BB7CEA72141B2EAD6F38A2651416F429640B9A72141449FCB1C89265141163AA64D56A72141B89E2B7C9B26514150509213EDA72141DCC9A351A826514184FA4C6017A82141B9AE24F0AB265141083B4A6F79A8214127EC57B499265141');
INSERT INTO "nyc_census_blocks" ("blkid","popn_total","popn_white","popn_black","popn_nativ","popn_asian","popn_other","boroname",geom) VALUES ('360850040001000','62','14','18','2','25','3','Staten Island','010600002026690000010000000103000000010000000600000082DCED72969D2141563247C49E2651417C120440079D214123319BFC8626514179D4895B6A9C2141F3667FC995265141C0428AC2C29C214159EB5C75AC265141CB126202D69C214180215728B126514182DCED72969D2141563247C49E265141');
COMMIT;

0 comments on commit b6e7102

Please sign in to comment.