Skip to content

Commit

Permalink
importccl: guard all silently ignored PGDUMP stmts with option
Browse files Browse the repository at this point in the history
Previously, we would silently ignore several statements we might come
across in a PGDUMP file. These statements can be parsed by CRDB but are
not implemented by IMPORT.

This change brings most of these silently ignored stmts under the
umbrella of the newly introduced `ignore_unsupported` option. The
motivation for this change is to further improve the UX of this option.
If specified, we will ignore unsupported stmts, else we will fail the
IMPORT.

This will result in IMPORTs that previously succeeded by silently
ignoring some stmts to fail. The solution is to add the
`ignore_unsupported` option.

Release note (sql change): Users will now need to specify
`ignore_unsupported` to ignore all unsupported import stmts during an
IMPORT PGDUMP.
  • Loading branch information
adityamaru committed Jan 31, 2021
1 parent dc70754 commit 20f169e
Show file tree
Hide file tree
Showing 7 changed files with 27,658 additions and 76 deletions.
145 changes: 145 additions & 0 deletions out

Large diffs are not rendered by default.

27,362 changes: 27,362 additions & 0 deletions output

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion pkg/ccl/importccl/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,8 @@ func makeInputConverter(
return newPgCopyReader(spec.Format.PgCopy, kvCh, spec.WalltimeNanos,
int(spec.ReaderParallelism), singleTable, singleTableTargetCols, evalCtx)
case roachpb.IOFileFormat_PgDump:
return newPgDumpReader(ctx, kvCh, spec.Format.PgDump, spec.WalltimeNanos, spec.Tables, evalCtx)
return newPgDumpReader(ctx, kvCh, spec.Format.PgDump, spec.WalltimeNanos, spec.Tables,
spec.Format.PgDump.IgnoreUnsupported, evalCtx)
case roachpb.IOFileFormat_Avro:
return newAvroInputReader(
kvCh, singleTable, spec.Format.Avro, spec.WalltimeNanos,
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/importccl/import_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -953,7 +953,8 @@ func pgDumpFormat() roachpb.IOFileFormat {
return roachpb.IOFileFormat{
Format: roachpb.IOFileFormat_PgDump,
PgDump: roachpb.PgDumpOptions{
MaxRowSize: 64 * 1024,
MaxRowSize: 64 * 1024,
IgnoreUnsupported: true,
},
}
}
Expand Down
16 changes: 9 additions & 7 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,7 @@ END;
{
name: "sequence",
typ: "PGDUMP",
with: "WITH ignore_unsupported",
data: `
CREATE TABLE t (a INT8);
CREATE SEQUENCE public.i_seq
Expand Down Expand Up @@ -964,6 +965,7 @@ END;
INSERT INTO "bob" ("c", "b") VALUES (3, 2);
COMMIT
`,
with: `WITH ignore_unsupported`,
query: map[string][][]string{
`SELECT * FROM bob`: {
{"1", "NULL", "2"},
Expand Down Expand Up @@ -5509,11 +5511,11 @@ func TestImportPgDump(t *testing.T) {
CONSTRAINT simple_pkey PRIMARY KEY (i),
UNIQUE INDEX simple_b_s_idx (b, s),
INDEX simple_s_idx (s)
) PGDUMP DATA ($1)`,
) PGDUMP DATA ($1) WITH ignore_unsupported`,
simple,
},
{`single table dump`, expectSimple, `IMPORT TABLE simple FROM PGDUMP ($1)`, simple},
{`second table dump`, expectSecond, `IMPORT TABLE second FROM PGDUMP ($1)`, second},
{`single table dump`, expectSimple, `IMPORT TABLE simple FROM PGDUMP ($1) WITH ignore_unsupported`, simple},
{`second table dump`, expectSecond, `IMPORT TABLE second FROM PGDUMP ($1) WITH ignore_unsupported`, second},
{`simple from multi`, expectSimple, `IMPORT TABLE simple FROM PGDUMP ($1) WITH ignore_unsupported`, multitable},
{`second from multi`, expectSecond, `IMPORT TABLE second FROM PGDUMP ($1) WITH ignore_unsupported`, multitable},
{`all from multi`, expectAll, `IMPORT PGDUMP ($1) WITH ignore_unsupported`, multitable},
Expand Down Expand Up @@ -5730,7 +5732,7 @@ func TestImportPgDumpGeo(t *testing.T) {
sqlDB := sqlutils.MakeSQLRunner(conn)

sqlDB.Exec(t, `CREATE DATABASE importdb; SET DATABASE = importdb`)
sqlDB.Exec(t, "IMPORT PGDUMP 'nodelocal://0/geo_shp2pgsql.sql'")
sqlDB.Exec(t, "IMPORT PGDUMP 'nodelocal://0/geo_shp2pgsql.sql' WITH ignore_unsupported")

sqlDB.Exec(t, `CREATE DATABASE execdb; SET DATABASE = execdb`)
geoSQL, err := ioutil.ReadFile(filepath.Join(baseDir, "geo_shp2pgsql.sql"))
Expand Down Expand Up @@ -5765,7 +5767,7 @@ func TestImportPgDumpGeo(t *testing.T) {
sqlDB := sqlutils.MakeSQLRunner(conn)

sqlDB.Exec(t, `CREATE DATABASE importdb; SET DATABASE = importdb`)
sqlDB.Exec(t, "IMPORT PGDUMP 'nodelocal://0/geo_ogr2ogr.sql'")
sqlDB.Exec(t, "IMPORT PGDUMP 'nodelocal://0/geo_ogr2ogr.sql' WITH ignore_unsupported")

sqlDB.Exec(t, `CREATE DATABASE execdb; SET DATABASE = execdb`)
geoSQL, err := ioutil.ReadFile(filepath.Join(baseDir, "geo_ogr2ogr.sql"))
Expand Down Expand Up @@ -5882,7 +5884,7 @@ func TestImportCockroachDump(t *testing.T) {
conn := tc.Conns[0]
sqlDB := sqlutils.MakeSQLRunner(conn)

sqlDB.Exec(t, "IMPORT PGDUMP ($1)", "nodelocal://0/cockroachdump/dump.sql")
sqlDB.Exec(t, "IMPORT PGDUMP ($1) WITH ignore_unsupported", "nodelocal://0/cockroachdump/dump.sql")
sqlDB.CheckQueryResults(t, "SELECT * FROM t ORDER BY i", [][]string{
{"1", "test"},
{"2", "other"},
Expand Down Expand Up @@ -5938,7 +5940,7 @@ func TestCreateStatsAfterImport(t *testing.T) {

sqlDB.Exec(t, `SET CLUSTER SETTING sql.stats.automatic_collection.enabled=true`)

sqlDB.Exec(t, "IMPORT PGDUMP ($1)", "nodelocal://0/cockroachdump/dump.sql")
sqlDB.Exec(t, "IMPORT PGDUMP ($1) WITH ignore_unsupported", "nodelocal://0/cockroachdump/dump.sql")

// Verify that statistics have been created.
sqlDB.CheckQueryResultsRetry(t,
Expand Down
165 changes: 118 additions & 47 deletions pkg/ccl/importccl/read_import_pgdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,10 @@ func readPostgresStmt(
}
case *tree.AlterTableAddColumn:
if cmd.IfNotExists {
if ignoreUnsupportedStmts {
// Write to shunt.
continue
}
return errors.Errorf("unsupported statement: %s", stmt)
}
create.Defs = append(create.Defs, cmd.ColumnDef)
Expand All @@ -415,8 +419,16 @@ func readPostgresStmt(
return colinfo.NewUndefinedColumnError(cmd.Column.String())
}
case *tree.AlterTableValidateConstraint:
// ignore
if ignoreUnsupportedStmts {
// Write to shunt.
continue
}
return errors.Errorf("unsupported statement: %s", stmt)
default:
if ignoreUnsupportedStmts {
// Write to shunt.
continue
}
return errors.Errorf("unsupported statement: %s", stmt)
}
}
Expand All @@ -431,10 +443,11 @@ func readPostgresStmt(
createSeq[name] = stmt
}
case *tree.AlterSequence:
if len(stmt.Options) != 1 || stmt.Options[0].Name != tree.SeqOptOwnedBy ||
!ignoreUnsupportedStmts {
return errors.Errorf("unsupported %T statement: %s", stmt, stmt)
if ignoreUnsupportedStmts {
// Write to shunt file.
return nil
}
return errors.Errorf("unsupported %T statement: %s", stmt, stmt)
// Some SELECT statements mutate schema. Search for those here.
case *tree.Select:
switch sel := stmt.Select.(type) {
Expand All @@ -458,12 +471,11 @@ func readPostgresStmt(
// Search for a SQLFn, which returns a SQL string to execute.
fn := ov.SQLFn
if fn == nil {
switch f := expr.Func.String(); f {
case "set_config", "setval":
if ignoreUnsupportedStmts {
// Write to shunt file.
continue
default:
return errors.Errorf("unsupported function call: %s", expr.Func.String())
}
return errors.Errorf("unsupported function call: %s", expr.Func.String())
}
// Attempt to convert all func exprs to datums.
datums := make(tree.Datums, len(expr.Exprs))
Expand Down Expand Up @@ -500,10 +512,18 @@ func readPostgresStmt(
}
}
default:
if ignoreUnsupportedStmts {
// Write unsupported select expressions to the SHUNT file.
continue
}
return errors.Errorf("unsupported %T SELECT expr: %s", expr, expr)
}
}
default:
if ignoreUnsupportedStmts {
// Write to shunt file.
return nil
}
return errors.Errorf("unsupported %T SELECT: %s", sel, sel)
}
case *tree.DropTable:
Expand Down Expand Up @@ -532,21 +552,34 @@ func readPostgresStmt(
return err
}
}
case *tree.CreateExtension, *tree.CommentOnDatabase, *tree.CommentOnTable,
*tree.CommentOnIndex, *tree.CommentOnColumn:
// ignore parsed by unsupported stmts.
case *tree.BeginTransaction, *tree.CommitTransaction:
// ignore txns.
case *tree.SetVar, *tree.Insert, *tree.CopyFrom, copyData, *tree.Delete:
// ignore SETs and DMLs.
case *tree.Analyze:
// ANALYZE is syntactic sugar for CreateStatistics. It can be ignored because
// the auto stats stuff will pick up the changes and run if needed.
// Ignore transaction statements as they have no meaning during an IMPORT.
// TODO(during review): Should we guard these statements under the
// ignore_unsupported flag as well?
case *tree.Insert, *tree.CopyFrom, *tree.Delete, copyData:
// handled during the data ingestion pass.
case *tree.CreateExtension, *tree.CommentOnDatabase, *tree.CommentOnTable,
*tree.CommentOnIndex, *tree.CommentOnColumn, *tree.SetVar, *tree.Analyze:
// These are the statements that can be parsed by CRDB but are not
// supported, or are not required to be processed, during an IMPORT.
// - ignore txns.
// - ignore SETs and DMLs.
// - ANALYZE is syntactic sugar for CreateStatistics. It can be ignored
// because the auto stats stuff will pick up the changes and run if needed.
if ignoreUnsupportedStmts {
// Write to shunt if the user has asked us to skip.
return nil
}
return errors.Errorf("unsupported %T statement: %s", stmt, stmt)
case error:
if !errors.Is(stmt, errCopyDone) {
return stmt
}
default:
if ignoreUnsupportedStmts {
// Write to shunt file.
return nil
}
return errors.Errorf("unsupported %T statement: %s", stmt, stmt)
}
return nil
Expand Down Expand Up @@ -576,14 +609,15 @@ func getTableName2(u *tree.UnresolvedObjectName) (string, error) {
}

type pgDumpReader struct {
tableDescs map[string]catalog.TableDescriptor
tables map[string]*row.DatumRowConverter
descs map[string]*execinfrapb.ReadImportDataSpec_ImportTable
kvCh chan row.KVBatch
opts roachpb.PgDumpOptions
walltime int64
colMap map[*row.DatumRowConverter](map[string]int)
evalCtx *tree.EvalContext
tableDescs map[string]catalog.TableDescriptor
tables map[string]*row.DatumRowConverter
descs map[string]*execinfrapb.ReadImportDataSpec_ImportTable
kvCh chan row.KVBatch
opts roachpb.PgDumpOptions
walltime int64
colMap map[*row.DatumRowConverter](map[string]int)
ignoreUnsupportedStmts bool
evalCtx *tree.EvalContext
}

var _ inputConverter = &pgDumpReader{}
Expand All @@ -595,6 +629,7 @@ func newPgDumpReader(
opts roachpb.PgDumpOptions,
walltime int64,
descs map[string]*execinfrapb.ReadImportDataSpec_ImportTable,
ignoreUnsupportedStmts bool,
evalCtx *tree.EvalContext,
) (*pgDumpReader, error) {
tableDescs := make(map[string]catalog.TableDescriptor, len(descs))
Expand Down Expand Up @@ -625,14 +660,15 @@ func newPgDumpReader(
}
}
return &pgDumpReader{
kvCh: kvCh,
tableDescs: tableDescs,
tables: converters,
descs: descs,
opts: opts,
walltime: walltime,
colMap: colMap,
evalCtx: evalCtx,
kvCh: kvCh,
tableDescs: tableDescs,
tables: converters,
descs: descs,
opts: opts,
walltime: walltime,
colMap: colMap,
ignoreUnsupportedStmts: ignoreUnsupportedStmts,
evalCtx: evalCtx,
}, nil
}

Expand Down Expand Up @@ -701,6 +737,10 @@ func (m *pgDumpReader) readFile(
timestamp := timestampAfterEpoch(m.walltime)
values, ok := i.Rows.Select.(*tree.ValuesClause)
if !ok {
if m.ignoreUnsupportedStmts {
// Write to shunt.
continue
}
return errors.Errorf("unsupported: %s", i.Rows.Select)
}
inserts++
Expand Down Expand Up @@ -847,22 +887,41 @@ func (m *pgDumpReader) readFile(
// by pg_dump, and thus if it isn't, we don't try to figure out what to do.
sc, ok := i.Select.(*tree.SelectClause)
if !ok {
if m.ignoreUnsupportedStmts {
// Write to shunt file.
continue
}
return errors.Errorf("unsupported %T Select: %v", i.Select, i.Select)
}
if len(sc.Exprs) != 1 {
if m.ignoreUnsupportedStmts {
// Write to shunt file.
continue
}
return errors.Errorf("unsupported %d select args: %v", len(sc.Exprs), sc.Exprs)
}
fn, ok := sc.Exprs[0].Expr.(*tree.FuncExpr)
if !ok {
if m.ignoreUnsupportedStmts {
// Write to shunt file.
continue
}
return errors.Errorf("unsupported select arg %T: %v", sc.Exprs[0].Expr, sc.Exprs[0].Expr)
}

switch funcName := strings.ToLower(fn.Func.String()); funcName {
case "search_path", "pg_catalog.set_config":

continue
if m.ignoreUnsupportedStmts {
// Write to shunt file.
continue
}
return errors.Errorf("unsupported %d fn args: %v", len(fn.Exprs), fn.Exprs)
case "setval", "pg_catalog.setval":
if args := len(fn.Exprs); args < 2 || args > 3 {
if m.ignoreUnsupportedStmts {
// Write to shunt file.
continue
}
return errors.Errorf("unsupported %d fn args: %v", len(fn.Exprs), fn.Exprs)
}
seqname, ok := fn.Exprs[0].(*tree.StrVal)
Expand All @@ -879,6 +938,10 @@ func (m *pgDumpReader) readFile(
}
seqval, ok := fn.Exprs[1].(*tree.NumVal)
if !ok {
if m.ignoreUnsupportedStmts {
// Write to shunt file.
continue
}
return errors.Errorf("unsupported setval %T arg: %v", fn.Exprs[1], fn.Exprs[1])
}
val, err := seqval.AsInt64()
Expand All @@ -889,6 +952,10 @@ func (m *pgDumpReader) readFile(
if len(fn.Exprs) == 3 {
called, ok := fn.Exprs[2].(*tree.DBool)
if !ok {
if m.ignoreUnsupportedStmts {
// Write to shunt file.
continue
}
return errors.Errorf("unsupported setval %T arg: %v", fn.Exprs[2], fn.Exprs[2])
}
isCalled = bool(*called)
Expand All @@ -913,27 +980,31 @@ func (m *pgDumpReader) readFile(
case "addgeometrycolumn":
// handled during schema extraction.
default:
if m.ignoreUnsupportedStmts {
// Write to shunt file.
continue
}
return errors.Errorf("unsupported function: %s", funcName)
}
case *tree.CreateExtension, *tree.CommentOnDatabase, *tree.CommentOnTable,
*tree.CommentOnIndex, *tree.CommentOnColumn, *tree.AlterSequence:
// parseable but ignored during schema extraction.
// handled during schema extraction.
case *tree.SetVar, *tree.BeginTransaction, *tree.CommitTransaction, *tree.Analyze:
// ignored.
case *tree.CreateTable, *tree.AlterTable, *tree.AlterTableOwner, *tree.CreateIndex, *tree.CreateSequence, *tree.DropTable:
// handled during schema extraction.
case *tree.CreateTable, *tree.AlterTable, *tree.AlterTableOwner, *tree.CreateIndex,
*tree.CreateSequence, *tree.DropTable:
// handled during schema extraction.
case *tree.Delete:
switch stmt := i.Table.(type) {
case *tree.AliasedTableExpr:
// ogr2ogr has `DELETE FROM geometry_columns / geography_columns ...` statements.
// We're not planning to support this functionality in CRDB, so it is safe to ignore it when countered in PGDUMP.
if tn, ok := stmt.Expr.(*tree.TableName); !(ok && (tn.Table() == "geometry_columns" || tn.Table() == "geography_columns")) {
return errors.Errorf("unsupported DELETE FROM %T statement: %s", stmt, stmt)
}
default:
return errors.Errorf("unsupported %T statement: %s", i, i)
if m.ignoreUnsupportedStmts {
// Write to shunt file.
continue
}
return errors.Errorf("unsupported DELETE FROM %T statement: %s", stmt, stmt)
default:
if m.ignoreUnsupportedStmts {
// Write to shunt file.
continue
}
return errors.Errorf("unsupported %T statement: %v", i, i)
}
}
Expand Down
Loading

0 comments on commit 20f169e

Please sign in to comment.