Skip to content

Commit

Permalink
importccl: add option to log unsupported stmts
Browse files Browse the repository at this point in the history
This is the last commit, that adds an `ignored_stmt_log` options to
IMPORT PGDUMP. This option specifies the destination we will log the
statements that we skip over during an import. This option can only be
used in conjunction with `ignore_unsupported`, else the IMPORT will
fail.

Currently, we will write to two files during the import. One during the
schema parsing phase, and another during the data ingestion phase. The
files will be called:

`unsupported-data-stmts`: Contains unparseable stmts, and unsupported
DML stmts.

`unsupported-schema-stmts`: Contains unparseable stmts, and unsupported
DDL stmts.

Release note (sql change): New IMPORT PGDUMP option `ignored_stmt_log`
that allows users to specify where they would like to log stmts that
have been skipped during an import, by virtue of being unsupported.
  • Loading branch information
adityamaru committed Feb 18, 2021
1 parent 2e3cc52 commit 42f76d6
Show file tree
Hide file tree
Showing 10 changed files with 463 additions and 183 deletions.
3 changes: 1 addition & 2 deletions pkg/ccl/importccl/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,7 @@ func makeInputConverter(
return newPgCopyReader(spec.Format.PgCopy, kvCh, spec.WalltimeNanos,
int(spec.ReaderParallelism), singleTable, singleTableTargetCols, evalCtx)
case roachpb.IOFileFormat_PgDump:
return newPgDumpReader(ctx, kvCh, spec.Format.PgDump, spec.WalltimeNanos, spec.Tables,
spec.Format.PgDump.IgnoreUnsupported, evalCtx)
return newPgDumpReader(ctx, kvCh, spec.Format.PgDump, spec.WalltimeNanos, spec.Tables, evalCtx)
case roachpb.IOFileFormat_Avro:
return newAvroInputReader(
kvCh, singleTable, spec.Format.Avro, spec.WalltimeNanos,
Expand Down
7 changes: 3 additions & 4 deletions pkg/ccl/importccl/import_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,7 @@ func TestConverterFlushesBatches(t *testing.T) {
}

kvCh := make(chan row.KVBatch, batchSize)
conv, err := makeInputConverter(ctx, converterSpec, &evalCtx, kvCh,
nil /* seqChunkProvider */)
conv, err := makeInputConverter(ctx, converterSpec, &evalCtx, kvCh, nil /* seqChunkProvider */)
if err != nil {
t.Fatalf("makeInputConverter() error = %v", err)
}
Expand Down Expand Up @@ -953,8 +952,8 @@ func pgDumpFormat() roachpb.IOFileFormat {
return roachpb.IOFileFormat{
Format: roachpb.IOFileFormat_PgDump,
PgDump: roachpb.PgDumpOptions{
MaxRowSize: 64 * 1024,
//IgnoreUnsupported: true,
MaxRowSize: 64 * 1024,
IgnoreUnsupported: true,
},
}
}
Expand Down
126 changes: 123 additions & 3 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package importccl

import (
"bytes"
"context"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -99,7 +100,11 @@ const (
avroSchema = "schema"
avroSchemaURI = "schema_uri"

pgDumpIgnoreAllUnsupported = "ignore_unsupported"
pgDumpIgnoreAllUnsupported = "ignore_unsupported"
pgDumpIgnoreShuntFileDest = "ignored_stmt_log"
pgDumpUnsupportedSchemaStmtLog = "unsupported_schema_stmts"
pgDumpUnsupportedDataStmtLog = "unsupported_data-_stmts"
pgDumpMaxLoggedStmts = 10

// RunningStatusImportBundleParseSchema indicates to the user that a bundle format
// schema is being parsed
Expand Down Expand Up @@ -137,6 +142,7 @@ var importOptionExpectValues = map[string]sql.KVStringOptValidate{
avroJSONRecords: sql.KVStringOptRequireNoValue,

pgDumpIgnoreAllUnsupported: sql.KVStringOptRequireNoValue,
pgDumpIgnoreShuntFileDest: sql.KVStringOptRequireValue,
}

func makeStringSet(opts ...string) map[string]struct{} {
Expand Down Expand Up @@ -167,7 +173,7 @@ var mysqlOutAllowedOptions = makeStringSet(
var mysqlDumpAllowedOptions = makeStringSet(importOptionSkipFKs, csvRowLimit)
var pgCopyAllowedOptions = makeStringSet(pgCopyDelimiter, pgCopyNull, optMaxRowSize)
var pgDumpAllowedOptions = makeStringSet(optMaxRowSize, importOptionSkipFKs, csvRowLimit,
pgDumpIgnoreAllUnsupported)
pgDumpIgnoreAllUnsupported, pgDumpIgnoreShuntFileDest)

// DROP is required because the target table needs to be take offline during
// IMPORT INTO.
Expand Down Expand Up @@ -616,6 +622,13 @@ func importPlanHook(
format.PgDump.IgnoreUnsupported = true
}

if dest, ok := opts[pgDumpIgnoreShuntFileDest]; ok {
if !format.PgDump.IgnoreUnsupported {
return errors.New("cannot log unsupported PGDUMP stmts without `ignore_unsupported` option")
}
format.PgDump.IgnoreUnsupportedLog = dest
}

if override, ok := opts[csvRowLimit]; ok {
rowLimit, err := strconv.Atoi(override)
if err != nil {
Expand Down Expand Up @@ -1257,6 +1270,102 @@ func (r *importResumer) ReportResults(ctx context.Context, resultsCh chan<- tree
}
}

type loggerKind int

const (
schemaParsing loggerKind = iota
dataIngestion
)

// unsupportedStmtLogger is responsible for handling unsupported PGDUMP SQL
// statements seen during the import.
type unsupportedStmtLogger struct {
// Values are initialized based on the options specified in the IMPORT PGDUMP
// stmt.
ignoreUnsupported bool
ignoreUnsupportedLogDest string
externalStorage cloud.ExternalStorageFactory

// logBuffer holds the string to be flushed to the ignoreUnsupportedLogDest.
logBuffer *bytes.Buffer
numIgnoredStmts int

loggerType loggerKind
}

func makeUnsupportedStmtLogger(
ignoreUnsupported bool,
unsupportedLogDest string,
loggerType loggerKind,
externalStorage cloud.ExternalStorageFactory,
) *unsupportedStmtLogger {
l := &unsupportedStmtLogger{
ignoreUnsupported: ignoreUnsupported,
ignoreUnsupportedLogDest: unsupportedLogDest,
loggerType: loggerType,
logBuffer: new(bytes.Buffer),
externalStorage: externalStorage,
}
header := "Unsupported statements during schema parse phase:\n\n"
if loggerType == dataIngestion {
header = "Unsupported statements during data ingestion phase:\n\n"
}
l.logBuffer.WriteString(header)
return l
}

func (u *unsupportedStmtLogger) log(logLine string, isParseError bool) {
// We have already logged parse errors during the schema ingestion phase, so
// skip them to avoid duplicate entries.
skipLoggingParseErr := isParseError && u.loggerType == dataIngestion
if u.ignoreUnsupportedLogDest == "" || skipLoggingParseErr {
return
}

if u.numIgnoredStmts < pgDumpMaxLoggedStmts {
if isParseError {
logLine = fmt.Sprintf("%s: could not be parsed\n", logLine)
} else {
logLine = fmt.Sprintf("%s: unsupported by IMPORT\n", logLine)
}
u.logBuffer.Write([]byte(logLine))
}
u.numIgnoredStmts++
}

func (u *unsupportedStmtLogger) flush(ctx context.Context, user security.SQLUsername) error {
if u.ignoreUnsupportedLogDest == "" {
return nil
}

numLoggedStmts := pgDumpMaxLoggedStmts
if u.numIgnoredStmts < pgDumpMaxLoggedStmts {
numLoggedStmts = u.numIgnoredStmts
}
u.logBuffer.WriteString(fmt.Sprintf("\nLogging %d out of %d ignored statements.\n",
numLoggedStmts, u.numIgnoredStmts))

conf, err := cloudimpl.ExternalStorageConfFromURI(u.ignoreUnsupportedLogDest, user)
if err != nil {
return errors.Wrap(err, "failed to log unsupported stmts during IMPORT PGDUMP")
}
var s cloud.ExternalStorage
if s, err = u.externalStorage(ctx, conf); err != nil {
return errors.New("failed to log unsupported stmts during IMPORT PGDUMP")
}
defer s.Close()

logFileName := pgDumpUnsupportedSchemaStmtLog
if u.loggerType == dataIngestion {
logFileName = pgDumpUnsupportedDataStmtLog
}
err = s.WriteFile(ctx, logFileName, bytes.NewReader(u.logBuffer.Bytes()))
if err != nil {
return errors.Wrap(err, "failed to log unsupported stmts to log during IMPORT PGDUMP")
}
return nil
}

// parseAndCreateBundleTableDescs parses and creates the table
// descriptors for bundle formats.
func parseAndCreateBundleTableDescs(
Expand Down Expand Up @@ -1306,8 +1415,19 @@ func parseAndCreateBundleTableDescs(
tableDescs, err = readMysqlCreateTable(ctx, reader, evalCtx, p, defaultCSVTableID, parentID, tableName, fks, seqVals, owner, walltime)
case roachpb.IOFileFormat_PgDump:
evalCtx := &p.ExtendedEvalContext().EvalContext

// Setup a logger to handle unsupported DDL statements in the PGDUMP file.
unsupportedStmtLogger := makeUnsupportedStmtLogger(format.PgDump.IgnoreUnsupported,
format.PgDump.IgnoreUnsupportedLog, schemaParsing, p.ExecCfg().DistSQLSrv.ExternalStorage)

tableDescs, err = readPostgresCreateTable(ctx, reader, evalCtx, p, tableName, parentID,
walltime, fks, int(format.PgDump.MaxRowSize), owner, format.PgDump.IgnoreUnsupported)
walltime, fks, int(format.PgDump.MaxRowSize), owner, unsupportedStmtLogger)

logErr := unsupportedStmtLogger.flush(ctx, p.User())
if logErr != nil {
return nil, logErr
}

default:
return tableDescs, errors.Errorf("non-bundle format %q does not support reading schemas", format.Format.String())
}
Expand Down
95 changes: 86 additions & 9 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,7 @@ END;
name: "fk",
typ: "PGDUMP",
data: testPgdumpFk,
with: "WITH ignore_unsupported",
query: map[string][][]string{
getTablesQuery: {
{"public", "cities", "table"},
Expand Down Expand Up @@ -896,7 +897,7 @@ END;
name: "fk-skip",
typ: "PGDUMP",
data: testPgdumpFk,
with: `WITH skip_foreign_keys`,
with: `WITH skip_foreign_keys, ignore_unsupported`,
query: map[string][][]string{
getTablesQuery: {
{"public", "cities", "table"},
Expand All @@ -911,13 +912,14 @@ END;
name: "fk unreferenced",
typ: "TABLE weather FROM PGDUMP",
data: testPgdumpFk,
with: "WITH ignore_unsupported",
err: `table "cities" not found`,
},
{
name: "fk unreferenced skipped",
typ: "TABLE weather FROM PGDUMP",
data: testPgdumpFk,
with: `WITH skip_foreign_keys`,
with: `WITH skip_foreign_keys, ignore_unsupported`,
query: map[string][][]string{
getTablesQuery: {{"public", "weather", "table"}},
},
Expand Down Expand Up @@ -5671,13 +5673,20 @@ func TestImportPgDumpIgnoredStmts(t *testing.T) {
sqlDB := sqlutils.MakeSQLRunner(conn)

data := `
-- Statements that CRDB cannot parse.
CREATE TRIGGER conditions_set_updated_at BEFORE UPDATE ON conditions FOR EACH ROW EXECUTE PROCEDURE set_updated_at();
REVOKE ALL ON SEQUENCE knex_migrations_id_seq FROM PUBLIC;
REVOKE ALL ON SEQUENCE knex_migrations_id_seq FROM database;
GRANT ALL ON SEQUENCE knex_migrations_id_seq TO database;
GRANT SELECT ON SEQUENCE knex_migrations_id_seq TO opentrials_readonly;
CREATE TABLE foo (id INT);
COMMENT ON EXTENSION plpgsql IS 'PL/pgSQL procedural language';
CREATE EXTENSION IF NOT EXISTS plpgsql WITH SCHEMA pg_catalog;
-- Valid statement.
CREATE TABLE foo (id INT);
CREATE FUNCTION public.isnumeric(text) RETURNS boolean
LANGUAGE sql
Expand All @@ -5686,15 +5695,21 @@ func TestImportPgDumpIgnoredStmts(t *testing.T) {
$_$;
ALTER FUNCTION public.isnumeric(text) OWNER TO roland;
INSERT INTO foo VALUES (1), (2), (3);
-- Valid statements.
INSERT INTO foo VALUES (1), (2), (3);
CREATE TABLE t (i INT8);
-- Statements that CRDB can parse, but IMPORT does not support.
-- These are processed during the schema pass of IMPORT.
COMMENT ON TABLE t IS 'This should be skipped';
COMMENT ON DATABASE t IS 'This should be skipped';
COMMENT ON COLUMN t IS 'This should be skipped';
COMMENT ON EXTENSION;
COMMENT ON EXTENSION plpgsql IS 'PL/pgSQL procedural language';
CREATE EXTENSION IF NOT EXISTS plpgsql WITH SCHEMA pg_catalog;
-- Statements that CRDB can parse, but IMPORT does not support.
-- These are processed during the data ingestion pass of IMPORT.
SELECT pg_catalog.set_config('search_path', '', false);
DELETE FROM geometry_columns WHERE f_table_name = 'nyc_census_blocks' AND f_table_schema = 'public';
`

srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -5704,16 +5719,78 @@ func TestImportPgDumpIgnoredStmts(t *testing.T) {
}))
defer srv.Close()
t.Run("ignore-unsupported", func(t *testing.T) {
sqlDB.Exec(t, "CREATE DATABASE foo; USE foo;")
sqlDB.Exec(t, "IMPORT PGDUMP ($1) WITH ignore_unsupported", srv.URL)
// Check that statements which are not expected to be ignored, are still
// processed.
sqlDB.CheckQueryResults(t, "SELECT * FROM foo", [][]string{{"1"}, {"2"}, {"3"}})
sqlDB.Exec(t, "DROP TABLE foo")
})

t.Run("dont-ignore-unsupported", func(t *testing.T) {
sqlDB.Exec(t, "CREATE DATABASE foo1; USE foo1;")
sqlDB.ExpectErr(t, "syntax error", "IMPORT PGDUMP ($1)", srv.URL)
})

t.Run("require-both-unsupported-options", func(t *testing.T) {
sqlDB.Exec(t, "CREATE DATABASE foo2; USE foo2;")
ignoredLog := `userfile:///ignore.log`
sqlDB.ExpectErr(t, "cannot log unsupported PGDUMP stmts without `ignore_unsupported` option",
"IMPORT PGDUMP ($1) WITH ignored_stmt_log=$2", srv.URL, ignoredLog)
})

t.Run("log-unsupported-stmts", func(t *testing.T) {
sqlDB.Exec(t, "CREATE DATABASE foo3; USE foo3;")
ignoredLog := `userfile:///ignore.log`
sqlDB.Exec(t, "IMPORT PGDUMP ($1) WITH ignore_unsupported, ignored_stmt_log=$2",
srv.URL, ignoredLog)
// Check that statements which are not expected to be ignored, are still
// processed.
sqlDB.CheckQueryResults(t, "SELECT * FROM foo", [][]string{{"1"}, {"2"}, {"3"}})

// Read the unsupported log and verify its contents.
store, err := cloudimpl.ExternalStorageFromURI(ctx, ignoredLog,
base.ExternalIODirConfig{},
tc.Servers[0].ClusterSettings(),
blobs.TestEmptyBlobClientFactory,
security.RootUserName(),
tc.Servers[0].InternalExecutor().(*sql.InternalExecutor), tc.Servers[0].DB())
require.NoError(t, err)
defer store.Close()
content, err := store.ReadFile(ctx, pgDumpUnsupportedSchemaStmtLog)
require.NoError(t, err)
descBytes, err := ioutil.ReadAll(content)
require.NoError(t, err)
expectedSchemaLog := `Unsupported statements during schema parse phase:
create trigger: could not be parsed
revoke privileges on sequence: could not be parsed
revoke privileges on sequence: could not be parsed
grant privileges on sequence: could not be parsed
grant privileges on sequence: could not be parsed
comment on extension: could not be parsed
create extension if not exists with: could not be parsed
create function: could not be parsed
alter function: could not be parsed
COMMENT ON TABLE t IS 'This should be skipped': unsupported by IMPORT
Logging 10 out of 13 ignored statements.
`
require.Equal(t, []byte(expectedSchemaLog), descBytes)

expectedDataLog := `Unsupported statements during data ingestion phase:
unsupported 3 fn args in select: ['search_path' '' false]: unsupported by IMPORT
unsupported *tree.Delete statement: DELETE FROM geometry_columns WHERE (f_table_name = 'nyc_census_blocks') AND (f_table_schema = 'public'): unsupported by IMPORT
Logging 2 out of 2 ignored statements.
`

content, err = store.ReadFile(ctx, pgDumpUnsupportedDataStmtLog)
require.NoError(t, err)
descBytes, err = ioutil.ReadAll(content)
require.NoError(t, err)
require.Equal(t, []byte(expectedDataLog), descBytes)
})
}

// TestImportPgDumpGeo tests that a file with SQLFn classes can be
Expand Down
Loading

0 comments on commit 42f76d6

Please sign in to comment.