Skip to content

Commit

Permalink
importccl: cleanup how we ignore stmts in IMPORT PGDUMP
Browse files Browse the repository at this point in the history
There are two kinds of unsupported statements when processing an IMPORT
PGDUMP:
- Unparseable
- Parseable, but unsupported

Previously, we had a set of regex statements to skip over statements
which fell into the first category. Those which fell into the second
category were skipped when we saw an AST Node of that kind.

This change does not ignore any additional statements, but removes the
regex. It does this by leaning on existing yacc grammar rules, or adding
new "unsupported" rules. These unsupported rules raise a special kind of
error which we catch during IMPORT PGDUMP, and is decorated with
required information.

This change also introduces a `ignore_unsupported` flag to IMPORT PGDUMP
which will allow users to skip all the stmts (of both categories
mentioned above). If not set, we will fail with either a parse or
unsupported stmt error.

Release note (sql change): New IMPORT PGDUMP option `ignore_unsupported`
to skip over all the unsupported PGDUMP stmts. The collection of these
statements will be appropriately documented.
  • Loading branch information
adityamaru committed Dec 11, 2020
1 parent 9851b5c commit a92e9ea
Show file tree
Hide file tree
Showing 10 changed files with 298 additions and 138 deletions.
13 changes: 11 additions & 2 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ const (
avroSchema = "schema"
avroSchemaURI = "schema_uri"

pgDumpIgnoreAllUnsupported = "ignore_unsupported"

// RunningStatusImportBundleParseSchema indicates to the user that a bundle format
// schema is being parsed
runningStatusImportBundleParseSchema jobs.RunningStatus = "parsing schema on Import Bundle"
Expand Down Expand Up @@ -133,6 +135,8 @@ var importOptionExpectValues = map[string]sql.KVStringOptValidate{
avroRecordsSeparatedBy: sql.KVStringOptRequireValue,
avroBinRecords: sql.KVStringOptRequireNoValue,
avroJSONRecords: sql.KVStringOptRequireNoValue,

pgDumpIgnoreAllUnsupported: sql.KVStringOptRequireNoValue,
}

func makeStringSet(opts ...string) map[string]struct{} {
Expand Down Expand Up @@ -162,7 +166,8 @@ var mysqlOutAllowedOptions = makeStringSet(
)
var mysqlDumpAllowedOptions = makeStringSet(importOptionSkipFKs, csvRowLimit)
var pgCopyAllowedOptions = makeStringSet(pgCopyDelimiter, pgCopyNull, optMaxRowSize)
var pgDumpAllowedOptions = makeStringSet(optMaxRowSize, importOptionSkipFKs, csvRowLimit)
var pgDumpAllowedOptions = makeStringSet(optMaxRowSize, importOptionSkipFKs, csvRowLimit,
pgDumpIgnoreAllUnsupported)

// DROP is required because the target table needs to be take offline during
// IMPORT INTO.
Expand Down Expand Up @@ -606,6 +611,9 @@ func importPlanHook(
maxRowSize = int32(sz)
}
format.PgDump.MaxRowSize = maxRowSize
if _, ok := opts[pgDumpIgnoreAllUnsupported]; ok {
format.PgDump.IgnoreUnsupported = true
}

if override, ok := opts[csvRowLimit]; ok {
rowLimit, err := strconv.Atoi(override)
Expand Down Expand Up @@ -1279,7 +1287,8 @@ func parseAndCreateBundleTableDescs(
tableDescs, err = readMysqlCreateTable(ctx, reader, evalCtx, p, defaultCSVTableID, parentID, tableName, fks, seqVals, owner, walltime)
case roachpb.IOFileFormat_PgDump:
evalCtx := &p.ExtendedEvalContext().EvalContext
tableDescs, err = readPostgresCreateTable(ctx, reader, evalCtx, p, tableName, parentID, walltime, fks, int(format.PgDump.MaxRowSize), owner)
tableDescs, err = readPostgresCreateTable(ctx, reader, evalCtx, p, tableName, parentID,
walltime, fks, int(format.PgDump.MaxRowSize), owner, format.PgDump.IgnoreUnsupported)
default:
return tableDescs, errors.Errorf("non-bundle format %q does not support reading schemas", format.Format.String())
}
Expand Down
68 changes: 45 additions & 23 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1018,29 +1018,6 @@ END;
data: "create table s.t (i INT8)",
err: `non-public schemas unsupported: s`,
},
{
name: "various create ignores",
typ: "PGDUMP",
data: `
CREATE TRIGGER conditions_set_updated_at BEFORE UPDATE ON conditions FOR EACH ROW EXECUTE PROCEDURE set_updated_at();
REVOKE ALL ON SEQUENCE knex_migrations_id_seq FROM PUBLIC;
REVOKE ALL ON SEQUENCE knex_migrations_id_seq FROM database;
GRANT ALL ON SEQUENCE knex_migrations_id_seq TO database;
GRANT SELECT ON SEQUENCE knex_migrations_id_seq TO opentrials_readonly;
CREATE FUNCTION public.isnumeric(text) RETURNS boolean
LANGUAGE sql
AS $_$
SELECT $1 ~ '^[0-9]+$'
$_$;
ALTER FUNCTION public.isnumeric(text) OWNER TO roland;
CREATE TABLE t (i INT8);
`,
query: map[string][][]string{
getTablesQuery: {{"public", "t", "table"}},
},
},
{
name: "many tables",
typ: "PGDUMP",
Expand Down Expand Up @@ -5417,6 +5394,51 @@ func TestImportPgDump(t *testing.T) {
})
}

func TestImportPgDumpIgnoredStmts(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 1 /* nodes */, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)
conn := tc.Conns[0]
sqlDB := sqlutils.MakeSQLRunner(conn)

data := `
CREATE TRIGGER conditions_set_updated_at BEFORE UPDATE ON conditions FOR EACH ROW EXECUTE PROCEDURE set_updated_at();
REVOKE ALL ON SEQUENCE knex_migrations_id_seq FROM PUBLIC;
REVOKE ALL ON SEQUENCE knex_migrations_id_seq FROM database;
GRANT ALL ON SEQUENCE knex_migrations_id_seq TO database;
GRANT SELECT ON SEQUENCE knex_migrations_id_seq TO opentrials_readonly;
CREATE FUNCTION public.isnumeric(text) RETURNS boolean
LANGUAGE sql
AS $_$
SELECT $1 ~ '^[0-9]+$'
$_$;
ALTER FUNCTION public.isnumeric(text) OWNER TO roland;
CREATE TABLE t (i INT8);
COMMENT ON TABLE t IS 'This should be skipped';
COMMENT ON DATABASE t IS 'This should be skipped';
COMMENT ON COLUMN t IS 'This should be skipped';
`

srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
_, _ = w.Write([]byte(data))
}
}))
defer srv.Close()
t.Run("ignore-unsupported", func(t *testing.T) {
sqlDB.Exec(t, "IMPORT PGDUMP ($1) WITH ignore_unsupported", srv.URL)
})

t.Run("dont-ignore-unsupported", func(t *testing.T) {
sqlDB.ExpectErr(t, "syntax error", "IMPORT PGDUMP ($1)", srv.URL)
})
}

// TestImportPgDumpGeo tests that a file with SQLFn classes can be
// imported. These are functions like AddGeometryColumn which create and
// execute SQL when called (!). They are, for example, used by shp2pgsql
Expand Down
78 changes: 44 additions & 34 deletions pkg/ccl/importccl/read_import_pgdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,17 @@ import (
)

type postgreStream struct {
s *bufio.Scanner
copy *postgreStreamCopy
s *bufio.Scanner
copy *postgreStreamCopy
ignoreUnsupportedStmts bool
}

// newPostgreStream returns a struct that can stream statements from an
// io.Reader.
func newPostgreStream(r io.Reader, max int) *postgreStream {
func newPostgreStream(r io.Reader, max int, ignoreUnsupportedStmts bool) *postgreStream {
s := bufio.NewScanner(r)
s.Buffer(nil, max)
p := &postgreStream{s: s}
p := &postgreStream{s: s, ignoreUnsupportedStmts: ignoreUnsupportedStmts}
s.Split(p.split)
return p
}
Expand Down Expand Up @@ -96,14 +97,19 @@ func (p *postgreStream) Next() (interface{}, error) {

for p.s.Scan() {
t := p.s.Text()
// Regardless if we can parse the statement, check that it's not something
// we want to ignore.
if isIgnoredStatement(t) {
continue
}
skipOverComments(t)

stmts, err := parser.Parse(t)
if err != nil {
// There are some statements which CRDB is unable to parse but we have
// explicitly marked as "to be skipped" during a PGDUMP import.
// TODO(adityamaru): Write these to a shunt file to see what has been
// skipped.
if unsupported, ok := err.(*tree.Unsupported); ok {
if unsupported.SkipDuringImportPGDump && p.ignoreUnsupportedStmts {
continue
}
}
return nil, err
}
switch len(stmts) {
Expand Down Expand Up @@ -146,20 +152,10 @@ func (p *postgreStream) Next() (interface{}, error) {
}

var (
ignoreComments = regexp.MustCompile(`^\s*(--.*)`)
ignoreStatements = []*regexp.Regexp{
regexp.MustCompile("(?i)^alter function"),
regexp.MustCompile("(?i)^alter sequence .* owned by"),
regexp.MustCompile("(?i)^comment on"),
regexp.MustCompile("(?i)^create extension"),
regexp.MustCompile("(?i)^create function"),
regexp.MustCompile("(?i)^create trigger"),
regexp.MustCompile("(?i)^grant .* on sequence"),
regexp.MustCompile("(?i)^revoke .* on sequence"),
}
ignoreComments = regexp.MustCompile(`^\s*(--.*)`)
)

func isIgnoredStatement(s string) bool {
func skipOverComments(s string) {
// Look for the first line with no whitespace or comments.
for {
m := ignoreComments.FindStringIndex(s)
Expand All @@ -168,13 +164,6 @@ func isIgnoredStatement(s string) bool {
}
s = s[m[1]:]
}
s = strings.TrimSpace(s)
for _, re := range ignoreStatements {
if re.MatchString(s) {
return true
}
}
return false
}

type regclassRewriter struct{}
Expand Down Expand Up @@ -230,6 +219,7 @@ func readPostgresCreateTable(
fks fkHandler,
max int,
owner security.SQLUsername,
ignoreUnsupportedStmts bool,
) ([]*tabledesc.Mutable, error) {
// Modify the CreateTable stmt with the various index additions. We do this
// instead of creating a full table descriptor first and adding indexes
Expand All @@ -240,7 +230,7 @@ func readPostgresCreateTable(
createTbl := make(map[string]*tree.CreateTable)
createSeq := make(map[string]*tree.CreateSequence)
tableFKs := make(map[string][]*tree.ForeignKeyConstraintTableDef)
ps := newPostgreStream(input, max)
ps := newPostgreStream(input, max, ignoreUnsupportedStmts)
for {
stmt, err := ps.Next()
if err == io.EOF {
Expand Down Expand Up @@ -311,7 +301,8 @@ func readPostgresCreateTable(
if err != nil {
return nil, errors.Wrap(err, "postgres parse error")
}
if err := readPostgresStmt(ctx, evalCtx, match, fks, createTbl, createSeq, tableFKs, stmt, p, parentID); err != nil {
if err := readPostgresStmt(ctx, evalCtx, match, fks, createTbl, createSeq, tableFKs, stmt, p,
parentID, ignoreUnsupportedStmts); err != nil {
return nil, err
}
}
Expand All @@ -328,6 +319,7 @@ func readPostgresStmt(
stmt interface{},
p sql.JobExecContext,
parentID descpb.ID,
ignoreUnsupportedStmts bool,
) error {
switch stmt := stmt.(type) {
case *tree.CreateTable:
Expand Down Expand Up @@ -439,6 +431,11 @@ func readPostgresStmt(
if match == "" || match == name {
createSeq[name] = stmt
}
case *tree.AlterSequence:
if len(stmt.Options) != 1 || stmt.Options[0].Name != tree.SeqOptOwnedBy ||
!ignoreUnsupportedStmts {
return errors.Errorf("unsupported %T statement: %s", stmt, stmt)
}
// Some SELECT statements mutate schema. Search for those here.
case *tree.Select:
switch sel := stmt.Select.(type) {
Expand Down Expand Up @@ -494,7 +491,8 @@ func readPostgresStmt(
for _, fnStmt := range fnStmts {
switch ast := fnStmt.AST.(type) {
case *tree.AlterTable:
if err := readPostgresStmt(ctx, evalCtx, match, fks, createTbl, createSeq, tableFKs, ast, p, parentID); err != nil {
if err := readPostgresStmt(ctx, evalCtx, match, fks, createTbl, createSeq,
tableFKs, ast, p, parentID, ignoreUnsupportedStmts); err != nil {
return err
}
default:
Expand Down Expand Up @@ -535,13 +533,22 @@ func readPostgresStmt(
return err
}
}
case *tree.CreateExtension, *tree.CommentOnDatabase, *tree.CommentOnTable,
*tree.CommentOnIndex, *tree.CommentOnColumn:
// Ignore stmts that are parseable but un-implemented if specified by the
// user.
// TODO(adityamaru): Write to a shunt file to indicate to the user what has
// been skipped.
if !ignoreUnsupportedStmts {
return errors.Errorf("unsupported %T statement: %s", stmt, stmt)
}
case *tree.BeginTransaction, *tree.CommitTransaction:
// ignore txns.
case *tree.SetVar, *tree.Insert, *tree.CopyFrom, copyData, *tree.Delete:
// ignore SETs and DMLs.
case *tree.Analyze:
// ANALYZE is syntatictic sugar for CreateStatistics. It can be ignored because
// the auto stats stuff will pick up the changes and run if needed.
// ANALYZE is syntatictic sugar for CreateStatistics. It can be ignored because
// the auto stats stuff will pick up the changes and run if needed.
case error:
if !errors.Is(stmt, errCopyDone) {
return stmt
Expand Down Expand Up @@ -655,7 +662,7 @@ func (m *pgDumpReader) readFile(
tableNameToRowsProcessed := make(map[string]int64)
var inserts, count int64
rowLimit := m.opts.RowLimit
ps := newPostgreStream(input, int(m.opts.MaxRowSize))
ps := newPostgreStream(input, int(m.opts.MaxRowSize), m.opts.IgnoreUnsupported)
semaCtx := tree.MakeSemaContext()
for _, conv := range m.tables {
conv.KvBatch.Source = inputIdx
Expand Down Expand Up @@ -910,6 +917,9 @@ func (m *pgDumpReader) readFile(
default:
return errors.Errorf("unsupported function: %s", funcName)
}
case *tree.CreateExtension, *tree.CommentOnDatabase, *tree.CommentOnTable,
*tree.CommentOnIndex, *tree.CommentOnColumn, *tree.AlterSequence:
// parseable but ignored during schema extraction.
case *tree.SetVar, *tree.BeginTransaction, *tree.CommitTransaction, *tree.Analyze:
// ignored.
case *tree.CreateTable, *tree.AlterTable, *tree.CreateIndex, *tree.CreateSequence, *tree.DropTable:
Expand Down
6 changes: 4 additions & 2 deletions pkg/ccl/importccl/read_import_pgdump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ select '123456789012345678901234567890123456789012345678901234567890123456789012
--
`

p := newPostgreStream(strings.NewReader(sql), defaultScanBuffer)
p := newPostgreStream(strings.NewReader(sql), defaultScanBuffer,
false /* ignoreUnsupportedStmts */)
var sb strings.Builder
for {
s, err := p.Next()
Expand Down Expand Up @@ -114,7 +115,8 @@ COPY public.t (s) FROM stdin;
--
`

p := newPostgreStream(strings.NewReader(sql), defaultScanBuffer)
p := newPostgreStream(strings.NewReader(sql), defaultScanBuffer,
false /* ignoreUnsupportedStmts */)
var sb strings.Builder
for {
s, err := p.Next()
Expand Down
Loading

0 comments on commit a92e9ea

Please sign in to comment.