Skip to content

Commit

Permalink
importccl: cleanup how we ignore stmts in IMPORT PGDUMP
Browse files Browse the repository at this point in the history
There are two kinds of unsupported statements when processing an IMPORT
PGDUMP:
- Unparseable
- Parseable, but unsupported

Previously, we had a set of regex statements to skip over statements
which fell into the first category. Those which fell into the second
category were skipped when we saw an AST Node of that kind.

This commit is responsible for the following:

- Removes regex and adds/modifies yacc rules to encompass these regex
  representations.

- Introduces a special UnsupportedNode which backs all yacc rules
  which previously returned an unimplemented error.

- Introduces an IMPORT PGDUMP option to `ignore_unsupported` statements.

- All statements which are not parseable will now be ignored if the
  option is set.

Note: the eventual goal is to have the `ignore_unsupported` flag also
allow users to ignore parseable, but unsupported statements. This will
be done in the next commit.

Release note (sql change): New IMPORT PGDUMP option `ignore_unsupported`
to skip over all the unsupported PGDUMP stmts. The collection of these
statements will be appropriately documented.
  • Loading branch information
adityamaru committed Jan 29, 2021
1 parent 58a7d76 commit dc70754
Show file tree
Hide file tree
Showing 10 changed files with 320 additions and 163 deletions.
13 changes: 11 additions & 2 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ const (
avroSchema = "schema"
avroSchemaURI = "schema_uri"

pgDumpIgnoreAllUnsupported = "ignore_unsupported"

// RunningStatusImportBundleParseSchema indicates to the user that a bundle format
// schema is being parsed
runningStatusImportBundleParseSchema jobs.RunningStatus = "parsing schema on Import Bundle"
Expand Down Expand Up @@ -133,6 +135,8 @@ var importOptionExpectValues = map[string]sql.KVStringOptValidate{
avroRecordsSeparatedBy: sql.KVStringOptRequireValue,
avroBinRecords: sql.KVStringOptRequireNoValue,
avroJSONRecords: sql.KVStringOptRequireNoValue,

pgDumpIgnoreAllUnsupported: sql.KVStringOptRequireNoValue,
}

func makeStringSet(opts ...string) map[string]struct{} {
Expand Down Expand Up @@ -162,7 +166,8 @@ var mysqlOutAllowedOptions = makeStringSet(
)
var mysqlDumpAllowedOptions = makeStringSet(importOptionSkipFKs, csvRowLimit)
var pgCopyAllowedOptions = makeStringSet(pgCopyDelimiter, pgCopyNull, optMaxRowSize)
var pgDumpAllowedOptions = makeStringSet(optMaxRowSize, importOptionSkipFKs, csvRowLimit)
var pgDumpAllowedOptions = makeStringSet(optMaxRowSize, importOptionSkipFKs, csvRowLimit,
pgDumpIgnoreAllUnsupported)

// DROP is required because the target table needs to be take offline during
// IMPORT INTO.
Expand Down Expand Up @@ -607,6 +612,9 @@ func importPlanHook(
maxRowSize = int32(sz)
}
format.PgDump.MaxRowSize = maxRowSize
if _, ok := opts[pgDumpIgnoreAllUnsupported]; ok {
format.PgDump.IgnoreUnsupported = true
}

if override, ok := opts[csvRowLimit]; ok {
rowLimit, err := strconv.Atoi(override)
Expand Down Expand Up @@ -1298,7 +1306,8 @@ func parseAndCreateBundleTableDescs(
tableDescs, err = readMysqlCreateTable(ctx, reader, evalCtx, p, defaultCSVTableID, parentID, tableName, fks, seqVals, owner, walltime)
case roachpb.IOFileFormat_PgDump:
evalCtx := &p.ExtendedEvalContext().EvalContext
tableDescs, err = readPostgresCreateTable(ctx, reader, evalCtx, p, tableName, parentID, walltime, fks, int(format.PgDump.MaxRowSize), owner)
tableDescs, err = readPostgresCreateTable(ctx, reader, evalCtx, p, tableName, parentID,
walltime, fks, int(format.PgDump.MaxRowSize), owner, format.PgDump.IgnoreUnsupported)
default:
return tableDescs, errors.Errorf("non-bundle format %q does not support reading schemas", format.Format.String())
}
Expand Down
90 changes: 62 additions & 28 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1020,29 +1020,6 @@ END;
data: "create table s.t (i INT8)",
err: `non-public schemas unsupported: s`,
},
{
name: "various create ignores",
typ: "PGDUMP",
data: `
CREATE TRIGGER conditions_set_updated_at BEFORE UPDATE ON conditions FOR EACH ROW EXECUTE PROCEDURE set_updated_at();
REVOKE ALL ON SEQUENCE knex_migrations_id_seq FROM PUBLIC;
REVOKE ALL ON SEQUENCE knex_migrations_id_seq FROM database;
GRANT ALL ON SEQUENCE knex_migrations_id_seq TO database;
GRANT SELECT ON SEQUENCE knex_migrations_id_seq TO opentrials_readonly;
CREATE FUNCTION public.isnumeric(text) RETURNS boolean
LANGUAGE sql
AS $_$
SELECT $1 ~ '^[0-9]+$'
$_$;
ALTER FUNCTION public.isnumeric(text) OWNER TO roland;
CREATE TABLE t (i INT8);
`,
query: map[string][][]string{
getTablesQuery: {{"public", "t", "table"}},
},
},
{
name: "many tables",
typ: "PGDUMP",
Expand Down Expand Up @@ -1597,7 +1574,8 @@ func TestImportRowLimit(t *testing.T) {
expectedRowLimit := 4

// Import a single table `second` and verify number of rows imported.
importQuery := fmt.Sprintf(`IMPORT TABLE second FROM PGDUMP ($1) WITH row_limit="%d"`, expectedRowLimit)
importQuery := fmt.Sprintf(`IMPORT TABLE second FROM PGDUMP ($1) WITH row_limit="%d",ignore_unsupported`,
expectedRowLimit)
sqlDB.Exec(t, importQuery, second...)

var numRows int
Expand All @@ -1608,7 +1586,7 @@ func TestImportRowLimit(t *testing.T) {

// Import multiple tables including `simple` and `second`.
expectedRowLimit = 3
importQuery = fmt.Sprintf(`IMPORT PGDUMP ($1) WITH row_limit="%d"`, expectedRowLimit)
importQuery = fmt.Sprintf(`IMPORT PGDUMP ($1) WITH row_limit="%d",ignore_unsupported`, expectedRowLimit)
sqlDB.Exec(t, importQuery, multitable...)
sqlDB.QueryRow(t, "SELECT count(*) FROM second").Scan(&numRows)
require.Equal(t, expectedRowLimit, numRows)
Expand Down Expand Up @@ -5536,9 +5514,9 @@ func TestImportPgDump(t *testing.T) {
},
{`single table dump`, expectSimple, `IMPORT TABLE simple FROM PGDUMP ($1)`, simple},
{`second table dump`, expectSecond, `IMPORT TABLE second FROM PGDUMP ($1)`, second},
{`simple from multi`, expectSimple, `IMPORT TABLE simple FROM PGDUMP ($1)`, multitable},
{`second from multi`, expectSecond, `IMPORT TABLE second FROM PGDUMP ($1)`, multitable},
{`all from multi`, expectAll, `IMPORT PGDUMP ($1)`, multitable},
{`simple from multi`, expectSimple, `IMPORT TABLE simple FROM PGDUMP ($1) WITH ignore_unsupported`, multitable},
{`second from multi`, expectSecond, `IMPORT TABLE second FROM PGDUMP ($1) WITH ignore_unsupported`, multitable},
{`all from multi`, expectAll, `IMPORT PGDUMP ($1) WITH ignore_unsupported`, multitable},
} {
t.Run(c.name, func(t *testing.T) {
sqlDB.Exec(t, `DROP TABLE IF EXISTS simple, second`)
Expand Down Expand Up @@ -5676,6 +5654,62 @@ func TestImportPgDump(t *testing.T) {
})
}

func TestImportPgDumpIgnoredStmts(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 1 /* nodes */, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)
conn := tc.Conns[0]
sqlDB := sqlutils.MakeSQLRunner(conn)

data := `
CREATE TRIGGER conditions_set_updated_at BEFORE UPDATE ON conditions FOR EACH ROW EXECUTE PROCEDURE set_updated_at();
REVOKE ALL ON SEQUENCE knex_migrations_id_seq FROM PUBLIC;
REVOKE ALL ON SEQUENCE knex_migrations_id_seq FROM database;
GRANT ALL ON SEQUENCE knex_migrations_id_seq TO database;
GRANT SELECT ON SEQUENCE knex_migrations_id_seq TO opentrials_readonly;
CREATE TABLE foo (id INT);
CREATE FUNCTION public.isnumeric(text) RETURNS boolean
LANGUAGE sql
AS $_$
SELECT $1 ~ '^[0-9]+$'
$_$;
ALTER FUNCTION public.isnumeric(text) OWNER TO roland;
INSERT INTO foo VALUES (1), (2), (3);
CREATE TABLE t (i INT8);
COMMENT ON TABLE t IS 'This should be skipped';
COMMENT ON DATABASE t IS 'This should be skipped';
COMMENT ON COLUMN t IS 'This should be skipped';
COMMENT ON EXTENSION;
COMMENT ON EXTENSION plpgsql IS 'PL/pgSQL procedural language';
CREATE EXTENSION IF NOT EXISTS plpgsql WITH SCHEMA pg_catalog;
`

srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
_, _ = w.Write([]byte(data))
}
}))
defer srv.Close()
t.Run("ignore-unsupported", func(t *testing.T) {
sqlDB.Exec(t, "IMPORT PGDUMP ($1) WITH ignore_unsupported", srv.URL)
// Check that statements which are not expected to be ignored, are still
// processed.
sqlDB.CheckQueryResults(t, "SELECT * FROM foo", [][]string{{"1"}, {"2"}, {"3"}})
sqlDB.Exec(t, "DROP TABLE foo")
})

t.Run("dont-ignore-unsupported", func(t *testing.T) {
sqlDB.ExpectErr(t, "syntax error", "IMPORT PGDUMP ($1)", srv.URL)
})
}

// TestImportPgDumpGeo tests that a file with SQLFn classes can be
// imported. These are functions like AddGeometryColumn which create and
// execute SQL when called (!). They are, for example, used by shp2pgsql
Expand Down
73 changes: 39 additions & 34 deletions pkg/ccl/importccl/read_import_pgdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,17 @@ import (
)

type postgreStream struct {
s *bufio.Scanner
copy *postgreStreamCopy
s *bufio.Scanner
copy *postgreStreamCopy
ignoreUnsupportedStmts bool
}

// newPostgreStream returns a struct that can stream statements from an
// io.Reader.
func newPostgreStream(r io.Reader, max int) *postgreStream {
func newPostgreStream(r io.Reader, max int, ignoreUnsupportedStmts bool) *postgreStream {
s := bufio.NewScanner(r)
s.Buffer(nil, max)
p := &postgreStream{s: s}
p := &postgreStream{s: s, ignoreUnsupportedStmts: ignoreUnsupportedStmts}
s.Split(p.split)
return p
}
Expand Down Expand Up @@ -95,14 +96,19 @@ func (p *postgreStream) Next() (interface{}, error) {

for p.s.Scan() {
t := p.s.Text()
// Regardless if we can parse the statement, check that it's not something
// we want to ignore.
if isIgnoredStatement(t) {
continue
}
skipOverComments(t)

stmts, err := parser.Parse(t)
if err != nil {
// There are some statements which CRDB is unable to parse but we have
// explicitly marked as "to be skipped" during a PGDUMP import.
// TODO(adityamaru): Write these to a shunt file to see what has been
// skipped.
if errors.HasType(err, (*tree.Unsupported)(nil)) {
if p.ignoreUnsupportedStmts {
continue
}
}
return nil, err
}
switch len(stmts) {
Expand Down Expand Up @@ -145,20 +151,10 @@ func (p *postgreStream) Next() (interface{}, error) {
}

var (
ignoreComments = regexp.MustCompile(`^\s*(--.*)`)
ignoreStatements = []*regexp.Regexp{
regexp.MustCompile("(?i)^alter function"),
regexp.MustCompile("(?i)^alter sequence .* owned by"),
regexp.MustCompile("(?i)^comment on"),
regexp.MustCompile("(?i)^create extension"),
regexp.MustCompile("(?i)^create function"),
regexp.MustCompile("(?i)^create trigger"),
regexp.MustCompile("(?i)^grant .* on sequence"),
regexp.MustCompile("(?i)^revoke .* on sequence"),
}
ignoreComments = regexp.MustCompile(`^\s*(--.*)`)
)

func isIgnoredStatement(s string) bool {
func skipOverComments(s string) {
// Look for the first line with no whitespace or comments.
for {
m := ignoreComments.FindStringIndex(s)
Expand All @@ -167,13 +163,6 @@ func isIgnoredStatement(s string) bool {
}
s = s[m[1]:]
}
s = strings.TrimSpace(s)
for _, re := range ignoreStatements {
if re.MatchString(s) {
return true
}
}
return false
}

type regclassRewriter struct{}
Expand Down Expand Up @@ -229,6 +218,7 @@ func readPostgresCreateTable(
fks fkHandler,
max int,
owner security.SQLUsername,
ignoreUnsupportedStmts bool,
) ([]*tabledesc.Mutable, error) {
// Modify the CreateTable stmt with the various index additions. We do this
// instead of creating a full table descriptor first and adding indexes
Expand All @@ -239,7 +229,7 @@ func readPostgresCreateTable(
createTbl := make(map[string]*tree.CreateTable)
createSeq := make(map[string]*tree.CreateSequence)
tableFKs := make(map[string][]*tree.ForeignKeyConstraintTableDef)
ps := newPostgreStream(input, max)
ps := newPostgreStream(input, max, ignoreUnsupportedStmts)
for {
stmt, err := ps.Next()
if err == io.EOF {
Expand Down Expand Up @@ -310,7 +300,8 @@ func readPostgresCreateTable(
if err != nil {
return nil, errors.Wrap(err, "postgres parse error")
}
if err := readPostgresStmt(ctx, evalCtx, match, fks, createTbl, createSeq, tableFKs, stmt, p, parentID); err != nil {
if err := readPostgresStmt(ctx, evalCtx, match, fks, createTbl, createSeq, tableFKs, stmt, p,
parentID, ignoreUnsupportedStmts); err != nil {
return nil, err
}
}
Expand All @@ -327,6 +318,7 @@ func readPostgresStmt(
stmt interface{},
p sql.JobExecContext,
parentID descpb.ID,
ignoreUnsupportedStmts bool,
) error {
switch stmt := stmt.(type) {
case *tree.CreateTable:
Expand Down Expand Up @@ -438,6 +430,11 @@ func readPostgresStmt(
if match == "" || match == name {
createSeq[name] = stmt
}
case *tree.AlterSequence:
if len(stmt.Options) != 1 || stmt.Options[0].Name != tree.SeqOptOwnedBy ||
!ignoreUnsupportedStmts {
return errors.Errorf("unsupported %T statement: %s", stmt, stmt)
}
// Some SELECT statements mutate schema. Search for those here.
case *tree.Select:
switch sel := stmt.Select.(type) {
Expand Down Expand Up @@ -493,7 +490,8 @@ func readPostgresStmt(
for _, fnStmt := range fnStmts {
switch ast := fnStmt.AST.(type) {
case *tree.AlterTable:
if err := readPostgresStmt(ctx, evalCtx, match, fks, createTbl, createSeq, tableFKs, ast, p, parentID); err != nil {
if err := readPostgresStmt(ctx, evalCtx, match, fks, createTbl, createSeq,
tableFKs, ast, p, parentID, ignoreUnsupportedStmts); err != nil {
return err
}
default:
Expand Down Expand Up @@ -534,13 +532,16 @@ func readPostgresStmt(
return err
}
}
case *tree.CreateExtension, *tree.CommentOnDatabase, *tree.CommentOnTable,
*tree.CommentOnIndex, *tree.CommentOnColumn:
// ignore parsed by unsupported stmts.
case *tree.BeginTransaction, *tree.CommitTransaction:
// ignore txns.
case *tree.SetVar, *tree.Insert, *tree.CopyFrom, copyData, *tree.Delete:
// ignore SETs and DMLs.
case *tree.Analyze:
// ANALYZE is syntatictic sugar for CreateStatistics. It can be ignored because
// the auto stats stuff will pick up the changes and run if needed.
// ANALYZE is syntactic sugar for CreateStatistics. It can be ignored because
// the auto stats stuff will pick up the changes and run if needed.
case error:
if !errors.Is(stmt, errCopyDone) {
return stmt
Expand Down Expand Up @@ -655,7 +656,7 @@ func (m *pgDumpReader) readFile(
tableNameToRowsProcessed := make(map[string]int64)
var inserts, count int64
rowLimit := m.opts.RowLimit
ps := newPostgreStream(input, int(m.opts.MaxRowSize))
ps := newPostgreStream(input, int(m.opts.MaxRowSize), m.opts.IgnoreUnsupported)
semaCtx := tree.MakeSemaContext()
for _, conv := range m.tables {
conv.KvBatch.Source = inputIdx
Expand Down Expand Up @@ -858,6 +859,7 @@ func (m *pgDumpReader) readFile(

switch funcName := strings.ToLower(fn.Func.String()); funcName {
case "search_path", "pg_catalog.set_config":

continue
case "setval", "pg_catalog.setval":
if args := len(fn.Exprs); args < 2 || args > 3 {
Expand Down Expand Up @@ -913,6 +915,9 @@ func (m *pgDumpReader) readFile(
default:
return errors.Errorf("unsupported function: %s", funcName)
}
case *tree.CreateExtension, *tree.CommentOnDatabase, *tree.CommentOnTable,
*tree.CommentOnIndex, *tree.CommentOnColumn, *tree.AlterSequence:
// parseable but ignored during schema extraction.
case *tree.SetVar, *tree.BeginTransaction, *tree.CommitTransaction, *tree.Analyze:
// ignored.
case *tree.CreateTable, *tree.AlterTable, *tree.AlterTableOwner, *tree.CreateIndex, *tree.CreateSequence, *tree.DropTable:
Expand Down
6 changes: 4 additions & 2 deletions pkg/ccl/importccl/read_import_pgdump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ select '123456789012345678901234567890123456789012345678901234567890123456789012
--
`

p := newPostgreStream(strings.NewReader(sql), defaultScanBuffer)
p := newPostgreStream(strings.NewReader(sql), defaultScanBuffer,
false /* ignoreUnsupportedStmts */)
var sb strings.Builder
for {
s, err := p.Next()
Expand Down Expand Up @@ -121,7 +122,8 @@ COPY public.t (s) FROM stdin;
--
`

p := newPostgreStream(strings.NewReader(sql), defaultScanBuffer)
p := newPostgreStream(strings.NewReader(sql), defaultScanBuffer,
false /* ignoreUnsupportedStmts */)
var sb strings.Builder
for {
s, err := p.Next()
Expand Down
Loading

0 comments on commit dc70754

Please sign in to comment.