Skip to content

Commit

Permalink
importccl: add option to log unsupported stmts
Browse files Browse the repository at this point in the history
This is the last commit, that adds an `ignored_stmt_log` options to
IMPORT PGDUMP. This option specifies the destination we will log the
statements that we skip over during an import. This option can only be
used in conjunction with `ignore_unsupported`, else the IMPORT will
fail.

Currently, we will write to two files during the import. One during the
schema parsing phase, and another during the data ingestion phase. The
files will be called:

`unsupported-data-stmts`: Contains unparseable stmts, and unsupported
DML stmts.

`unsupported-schema-stmts`: Contains unparseable stmts, and unsupported
DDL stmts.

Release note (sql change): New IMPORT PGDUMP option `ignored_stmt_log`
that allows users to specify where they would like to log stmts that
have been skipped during an import, by virtue of being unsupported.
  • Loading branch information
adityamaru committed Feb 12, 2021
1 parent 2e3cc52 commit 0d99c8e
Show file tree
Hide file tree
Showing 10 changed files with 487 additions and 183 deletions.
3 changes: 1 addition & 2 deletions pkg/ccl/importccl/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,7 @@ func makeInputConverter(
return newPgCopyReader(spec.Format.PgCopy, kvCh, spec.WalltimeNanos,
int(spec.ReaderParallelism), singleTable, singleTableTargetCols, evalCtx)
case roachpb.IOFileFormat_PgDump:
return newPgDumpReader(ctx, kvCh, spec.Format.PgDump, spec.WalltimeNanos, spec.Tables,
spec.Format.PgDump.IgnoreUnsupported, evalCtx)
return newPgDumpReader(ctx, kvCh, spec.Format.PgDump, spec.WalltimeNanos, spec.Tables, evalCtx)
case roachpb.IOFileFormat_Avro:
return newAvroInputReader(
kvCh, singleTable, spec.Format.Avro, spec.WalltimeNanos,
Expand Down
7 changes: 3 additions & 4 deletions pkg/ccl/importccl/import_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,7 @@ func TestConverterFlushesBatches(t *testing.T) {
}

kvCh := make(chan row.KVBatch, batchSize)
conv, err := makeInputConverter(ctx, converterSpec, &evalCtx, kvCh,
nil /* seqChunkProvider */)
conv, err := makeInputConverter(ctx, converterSpec, &evalCtx, kvCh, nil /* seqChunkProvider */)
if err != nil {
t.Fatalf("makeInputConverter() error = %v", err)
}
Expand Down Expand Up @@ -953,8 +952,8 @@ func pgDumpFormat() roachpb.IOFileFormat {
return roachpb.IOFileFormat{
Format: roachpb.IOFileFormat_PgDump,
PgDump: roachpb.PgDumpOptions{
MaxRowSize: 64 * 1024,
//IgnoreUnsupported: true,
MaxRowSize: 64 * 1024,
IgnoreUnsupported: true,
},
}
}
Expand Down
80 changes: 77 additions & 3 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package importccl

import (
"bytes"
"context"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -99,7 +100,11 @@ const (
avroSchema = "schema"
avroSchemaURI = "schema_uri"

pgDumpIgnoreAllUnsupported = "ignore_unsupported"
pgDumpIgnoreAllUnsupported = "ignore_unsupported"
pgDumpIgnoreShuntFileDest = "ignored_stmt_log"
pgDumpUnsupportedSchemaStmtLog = "unsupported_schema_stmts"
pgDumpUnsupportedDataStmtLog = "unsupported_data-_stmts"
pgDumpMaxLoggedStmts = 10

// RunningStatusImportBundleParseSchema indicates to the user that a bundle format
// schema is being parsed
Expand Down Expand Up @@ -137,6 +142,7 @@ var importOptionExpectValues = map[string]sql.KVStringOptValidate{
avroJSONRecords: sql.KVStringOptRequireNoValue,

pgDumpIgnoreAllUnsupported: sql.KVStringOptRequireNoValue,
pgDumpIgnoreShuntFileDest: sql.KVStringOptRequireValue,
}

func makeStringSet(opts ...string) map[string]struct{} {
Expand Down Expand Up @@ -167,7 +173,7 @@ var mysqlOutAllowedOptions = makeStringSet(
var mysqlDumpAllowedOptions = makeStringSet(importOptionSkipFKs, csvRowLimit)
var pgCopyAllowedOptions = makeStringSet(pgCopyDelimiter, pgCopyNull, optMaxRowSize)
var pgDumpAllowedOptions = makeStringSet(optMaxRowSize, importOptionSkipFKs, csvRowLimit,
pgDumpIgnoreAllUnsupported)
pgDumpIgnoreAllUnsupported, pgDumpIgnoreShuntFileDest)

// DROP is required because the target table needs to be take offline during
// IMPORT INTO.
Expand Down Expand Up @@ -616,6 +622,13 @@ func importPlanHook(
format.PgDump.IgnoreUnsupported = true
}

if dest, ok := opts[pgDumpIgnoreShuntFileDest]; ok {
if !format.PgDump.IgnoreUnsupported {
return errors.New("cannot log unsupported PGDUMP stmts without `ignore_unsupported` option")
}
format.PgDump.IgnoreUnsupportedLog = dest
}

if override, ok := opts[csvRowLimit]; ok {
rowLimit, err := strconv.Atoi(override)
if err != nil {
Expand Down Expand Up @@ -1257,6 +1270,52 @@ func (r *importResumer) ReportResults(ctx context.Context, resultsCh chan<- tree
}
}

// unsupportedStmtConfig stores information that controls how we handle
// unsupported PGDUMP SQL statements seen during the import.
type unsupportedStmtConfig struct {
// Values are initialized based on the options specified in the IMPORT PGDUMP
// stmt.
ignoreUnsupported bool
ignoreUnsupportedLogDest string

// logBuffer holds the string to be flushed to the ignoreUnsupportedLogDest.
logBuffer *bytes.Buffer
numIgnoredStmts int
// skipLoggingParseErr skips logging stmts that are ignored because we failed
// to parse them from the dump file. This is used to avoid logging duplicates
// during the schema extraction and data ingestion phases of import.
skipLoggingParseErr bool
}

func logIgnoredStmtsDuringSchemaParsing(
ctx context.Context, p sql.JobExecContext, cfg *unsupportedStmtConfig,
) error {
if cfg.ignoreUnsupportedLogDest == "" {
return nil
}

numLoggedStmts := pgDumpMaxLoggedStmts
if cfg.numIgnoredStmts < pgDumpMaxLoggedStmts {
numLoggedStmts = cfg.numIgnoredStmts
}
cfg.logBuffer.WriteString(fmt.Sprintf("\nLogging %d out of %d ignored statements.\n",
numLoggedStmts, cfg.numIgnoredStmts))

dest, err := p.ExecCfg().DistSQLSrv.ExternalStorageFromURI(ctx,
cfg.ignoreUnsupportedLogDest, p.User())
if err != nil {
return errors.Wrap(err, "failed to log unsupported stmts during IMPORT PGDUMP")
}
defer dest.Close()
err = dest.WriteFile(ctx, pgDumpUnsupportedSchemaStmtLog,
bytes.NewReader(cfg.logBuffer.Bytes()))
if err != nil {
return errors.Wrap(err, "failed to log unsupported during IMPORT PGDUMP")
}

return nil
}

// parseAndCreateBundleTableDescs parses and creates the table
// descriptors for bundle formats.
func parseAndCreateBundleTableDescs(
Expand Down Expand Up @@ -1306,8 +1365,23 @@ func parseAndCreateBundleTableDescs(
tableDescs, err = readMysqlCreateTable(ctx, reader, evalCtx, p, defaultCSVTableID, parentID, tableName, fks, seqVals, owner, walltime)
case roachpb.IOFileFormat_PgDump:
evalCtx := &p.ExtendedEvalContext().EvalContext

// Setup config to handle unsupported DDL statements in the PGDUMP file.
unsupportedCfg := unsupportedStmtConfig{
ignoreUnsupported: format.PgDump.IgnoreUnsupported,
logBuffer: new(bytes.Buffer),
ignoreUnsupportedLogDest: format.PgDump.IgnoreUnsupportedLog}
unsupportedCfg.logBuffer.WriteString("Unsupported statements during schema parse phase:\n\n")

tableDescs, err = readPostgresCreateTable(ctx, reader, evalCtx, p, tableName, parentID,
walltime, fks, int(format.PgDump.MaxRowSize), owner, format.PgDump.IgnoreUnsupported)
walltime, fks, int(format.PgDump.MaxRowSize), owner, &unsupportedCfg)

// Maybe flush ignored statements to the log destination.
logErr := logIgnoredStmtsDuringSchemaParsing(ctx, p, &unsupportedCfg)
if logErr != nil {
return nil, logErr
}

default:
return tableDescs, errors.Errorf("non-bundle format %q does not support reading schemas", format.Format.String())
}
Expand Down
95 changes: 86 additions & 9 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,7 @@ END;
name: "fk",
typ: "PGDUMP",
data: testPgdumpFk,
with: "WITH ignore_unsupported",
query: map[string][][]string{
getTablesQuery: {
{"public", "cities", "table"},
Expand Down Expand Up @@ -896,7 +897,7 @@ END;
name: "fk-skip",
typ: "PGDUMP",
data: testPgdumpFk,
with: `WITH skip_foreign_keys`,
with: `WITH skip_foreign_keys, ignore_unsupported`,
query: map[string][][]string{
getTablesQuery: {
{"public", "cities", "table"},
Expand All @@ -911,13 +912,14 @@ END;
name: "fk unreferenced",
typ: "TABLE weather FROM PGDUMP",
data: testPgdumpFk,
with: "WITH ignore_unsupported",
err: `table "cities" not found`,
},
{
name: "fk unreferenced skipped",
typ: "TABLE weather FROM PGDUMP",
data: testPgdumpFk,
with: `WITH skip_foreign_keys`,
with: `WITH skip_foreign_keys, ignore_unsupported`,
query: map[string][][]string{
getTablesQuery: {{"public", "weather", "table"}},
},
Expand Down Expand Up @@ -5671,13 +5673,20 @@ func TestImportPgDumpIgnoredStmts(t *testing.T) {
sqlDB := sqlutils.MakeSQLRunner(conn)

data := `
-- Statements that CRDB cannot parse.
CREATE TRIGGER conditions_set_updated_at BEFORE UPDATE ON conditions FOR EACH ROW EXECUTE PROCEDURE set_updated_at();
REVOKE ALL ON SEQUENCE knex_migrations_id_seq FROM PUBLIC;
REVOKE ALL ON SEQUENCE knex_migrations_id_seq FROM database;
GRANT ALL ON SEQUENCE knex_migrations_id_seq TO database;
GRANT SELECT ON SEQUENCE knex_migrations_id_seq TO opentrials_readonly;
CREATE TABLE foo (id INT);
COMMENT ON EXTENSION plpgsql IS 'PL/pgSQL procedural language';
CREATE EXTENSION IF NOT EXISTS plpgsql WITH SCHEMA pg_catalog;
-- Valid statement.
CREATE TABLE foo (id INT);
CREATE FUNCTION public.isnumeric(text) RETURNS boolean
LANGUAGE sql
Expand All @@ -5686,15 +5695,21 @@ func TestImportPgDumpIgnoredStmts(t *testing.T) {
$_$;
ALTER FUNCTION public.isnumeric(text) OWNER TO roland;
INSERT INTO foo VALUES (1), (2), (3);
-- Valid statements.
INSERT INTO foo VALUES (1), (2), (3);
CREATE TABLE t (i INT8);
-- Statements that CRDB can parse, but IMPORT does not support.
-- These are processed during the schema pass of IMPORT.
COMMENT ON TABLE t IS 'This should be skipped';
COMMENT ON DATABASE t IS 'This should be skipped';
COMMENT ON COLUMN t IS 'This should be skipped';
COMMENT ON EXTENSION;
COMMENT ON EXTENSION plpgsql IS 'PL/pgSQL procedural language';
CREATE EXTENSION IF NOT EXISTS plpgsql WITH SCHEMA pg_catalog;
-- Statements that CRDB can parse, but IMPORT does not support.
-- These are processed during the data ingestion pass of IMPORT.
SELECT pg_catalog.set_config('search_path', '', false);
DELETE FROM geometry_columns WHERE f_table_name = 'nyc_census_blocks' AND f_table_schema = 'public';
`

srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -5704,16 +5719,78 @@ func TestImportPgDumpIgnoredStmts(t *testing.T) {
}))
defer srv.Close()
t.Run("ignore-unsupported", func(t *testing.T) {
sqlDB.Exec(t, "CREATE DATABASE foo; USE foo;")
sqlDB.Exec(t, "IMPORT PGDUMP ($1) WITH ignore_unsupported", srv.URL)
// Check that statements which are not expected to be ignored, are still
// processed.
sqlDB.CheckQueryResults(t, "SELECT * FROM foo", [][]string{{"1"}, {"2"}, {"3"}})
sqlDB.Exec(t, "DROP TABLE foo")
})

t.Run("dont-ignore-unsupported", func(t *testing.T) {
sqlDB.Exec(t, "CREATE DATABASE foo1; USE foo1;")
sqlDB.ExpectErr(t, "syntax error", "IMPORT PGDUMP ($1)", srv.URL)
})

t.Run("require-both-unsupported-options", func(t *testing.T) {
sqlDB.Exec(t, "CREATE DATABASE foo2; USE foo2;")
ignoredLog := `userfile:///ignore.log`
sqlDB.ExpectErr(t, "cannot log unsupported PGDUMP stmts without `ignore_unsupported` option",
"IMPORT PGDUMP ($1) WITH ignored_stmt_log=$2", srv.URL, ignoredLog)
})

t.Run("log-unsupported-stmts", func(t *testing.T) {
sqlDB.Exec(t, "CREATE DATABASE foo3; USE foo3;")
ignoredLog := `userfile:///ignore.log`
sqlDB.Exec(t, "IMPORT PGDUMP ($1) WITH ignore_unsupported, ignored_stmt_log=$2",
srv.URL, ignoredLog)
// Check that statements which are not expected to be ignored, are still
// processed.
sqlDB.CheckQueryResults(t, "SELECT * FROM foo", [][]string{{"1"}, {"2"}, {"3"}})

// Read the unsupported log and verify its contents.
store, err := cloudimpl.ExternalStorageFromURI(ctx, ignoredLog,
base.ExternalIODirConfig{},
tc.Servers[0].ClusterSettings(),
blobs.TestEmptyBlobClientFactory,
security.RootUserName(),
tc.Servers[0].InternalExecutor().(*sql.InternalExecutor), tc.Servers[0].DB())
require.NoError(t, err)
defer store.Close()
content, err := store.ReadFile(ctx, pgDumpUnsupportedSchemaStmtLog)
require.NoError(t, err)
descBytes, err := ioutil.ReadAll(content)
require.NoError(t, err)
expectedSchemaLog := `Unsupported statements during schema parse phase:
create trigger: could not be parsed
revoke privileges on sequence: could not be parsed
revoke privileges on sequence: could not be parsed
grant privileges on sequence: could not be parsed
grant privileges on sequence: could not be parsed
comment on extension: could not be parsed
create extension if not exists with: could not be parsed
create function: could not be parsed
alter function: could not be parsed
COMMENT ON TABLE t IS 'This should be skipped': unsupported by IMPORT
Logging 10 out of 13 ignored statements.
`
require.Equal(t, []byte(expectedSchemaLog), descBytes)

expectedDataLog := `Unsupported statements during data ingestion phase:
unsupported 3 fn args in select: ['search_path' '' false]: unsupported by IMPORT
unsupported *tree.Delete statement: DELETE FROM geometry_columns WHERE (f_table_name = 'nyc_census_blocks') AND (f_table_schema = 'public'): unsupported by IMPORT
Logging 2 out of 2 ignored statements.
`

content, err = store.ReadFile(ctx, pgDumpUnsupportedDataStmtLog)
require.NoError(t, err)
descBytes, err = ioutil.ReadAll(content)
require.NoError(t, err)
require.Equal(t, []byte(expectedDataLog), descBytes)
})
}

// TestImportPgDumpGeo tests that a file with SQLFn classes can be
Expand Down
Loading

0 comments on commit 0d99c8e

Please sign in to comment.