Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

importccl: cleanup how we ignore stmts in IMPORT PGDUMP #57827

Merged
merged 3 commits into from
Feb 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/ccl/importccl/import_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,7 @@ func TestConverterFlushesBatches(t *testing.T) {
}

kvCh := make(chan row.KVBatch, batchSize)
conv, err := makeInputConverter(ctx, converterSpec, &evalCtx, kvCh,
nil /* seqChunkProvider */)
conv, err := makeInputConverter(ctx, converterSpec, &evalCtx, kvCh, nil /* seqChunkProvider */)
if err != nil {
t.Fatalf("makeInputConverter() error = %v", err)
}
Expand Down Expand Up @@ -953,7 +952,8 @@ func pgDumpFormat() roachpb.IOFileFormat {
return roachpb.IOFileFormat{
Format: roachpb.IOFileFormat_PgDump,
PgDump: roachpb.PgDumpOptions{
MaxRowSize: 64 * 1024,
MaxRowSize: 64 * 1024,
IgnoreUnsupported: true,
},
}
}
Expand Down
133 changes: 131 additions & 2 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package importccl

import (
"bytes"
"context"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -99,6 +100,12 @@ const (
avroSchema = "schema"
avroSchemaURI = "schema_uri"

pgDumpIgnoreAllUnsupported = "ignore_unsupported"
pgDumpIgnoreShuntFileDest = "ignored_stmt_log"
pgDumpUnsupportedSchemaStmtLog = "unsupported_schema_stmts"
pgDumpUnsupportedDataStmtLog = "unsupported_data-_stmts"
pgDumpMaxLoggedStmts = 10

// RunningStatusImportBundleParseSchema indicates to the user that a bundle format
// schema is being parsed
runningStatusImportBundleParseSchema jobs.RunningStatus = "parsing schema on Import Bundle"
Expand Down Expand Up @@ -133,6 +140,9 @@ var importOptionExpectValues = map[string]sql.KVStringOptValidate{
avroRecordsSeparatedBy: sql.KVStringOptRequireValue,
avroBinRecords: sql.KVStringOptRequireNoValue,
avroJSONRecords: sql.KVStringOptRequireNoValue,

pgDumpIgnoreAllUnsupported: sql.KVStringOptRequireNoValue,
pgDumpIgnoreShuntFileDest: sql.KVStringOptRequireValue,
}

func makeStringSet(opts ...string) map[string]struct{} {
Expand Down Expand Up @@ -162,7 +172,8 @@ var mysqlOutAllowedOptions = makeStringSet(
)
var mysqlDumpAllowedOptions = makeStringSet(importOptionSkipFKs, csvRowLimit)
var pgCopyAllowedOptions = makeStringSet(pgCopyDelimiter, pgCopyNull, optMaxRowSize)
var pgDumpAllowedOptions = makeStringSet(optMaxRowSize, importOptionSkipFKs, csvRowLimit)
var pgDumpAllowedOptions = makeStringSet(optMaxRowSize, importOptionSkipFKs, csvRowLimit,
pgDumpIgnoreAllUnsupported, pgDumpIgnoreShuntFileDest)

// DROP is required because the target table needs to be take offline during
// IMPORT INTO.
Expand Down Expand Up @@ -607,6 +618,16 @@ func importPlanHook(
maxRowSize = int32(sz)
}
format.PgDump.MaxRowSize = maxRowSize
if _, ok := opts[pgDumpIgnoreAllUnsupported]; ok {
format.PgDump.IgnoreUnsupported = true
}

if dest, ok := opts[pgDumpIgnoreShuntFileDest]; ok {
if !format.PgDump.IgnoreUnsupported {
return errors.New("cannot log unsupported PGDUMP stmts without `ignore_unsupported` option")
}
format.PgDump.IgnoreUnsupportedLog = dest
}

if override, ok := opts[csvRowLimit]; ok {
rowLimit, err := strconv.Atoi(override)
Expand Down Expand Up @@ -1249,6 +1270,102 @@ func (r *importResumer) ReportResults(ctx context.Context, resultsCh chan<- tree
}
}

type loggerKind int

const (
schemaParsing loggerKind = iota
dataIngestion
)

// unsupportedStmtLogger is responsible for handling unsupported PGDUMP SQL
// statements seen during the import.
type unsupportedStmtLogger struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

// Values are initialized based on the options specified in the IMPORT PGDUMP
// stmt.
ignoreUnsupported bool
ignoreUnsupportedLogDest string
externalStorage cloud.ExternalStorageFactory

// logBuffer holds the string to be flushed to the ignoreUnsupportedLogDest.
logBuffer *bytes.Buffer
numIgnoredStmts int

loggerType loggerKind
}

func makeUnsupportedStmtLogger(
ignoreUnsupported bool,
unsupportedLogDest string,
loggerType loggerKind,
externalStorage cloud.ExternalStorageFactory,
) *unsupportedStmtLogger {
l := &unsupportedStmtLogger{
ignoreUnsupported: ignoreUnsupported,
ignoreUnsupportedLogDest: unsupportedLogDest,
loggerType: loggerType,
logBuffer: new(bytes.Buffer),
externalStorage: externalStorage,
}
header := "Unsupported statements during schema parse phase:\n\n"
if loggerType == dataIngestion {
header = "Unsupported statements during data ingestion phase:\n\n"
}
l.logBuffer.WriteString(header)
return l
}

func (u *unsupportedStmtLogger) log(logLine string, isParseError bool) {
// We have already logged parse errors during the schema ingestion phase, so
// skip them to avoid duplicate entries.
skipLoggingParseErr := isParseError && u.loggerType == dataIngestion
if u.ignoreUnsupportedLogDest == "" || skipLoggingParseErr {
return
}

if u.numIgnoredStmts < pgDumpMaxLoggedStmts {
if isParseError {
logLine = fmt.Sprintf("%s: could not be parsed\n", logLine)
} else {
logLine = fmt.Sprintf("%s: unsupported by IMPORT\n", logLine)
}
u.logBuffer.Write([]byte(logLine))
}
u.numIgnoredStmts++
}

func (u *unsupportedStmtLogger) flush(ctx context.Context, user security.SQLUsername) error {
if u.ignoreUnsupportedLogDest == "" {
return nil
}

numLoggedStmts := pgDumpMaxLoggedStmts
if u.numIgnoredStmts < pgDumpMaxLoggedStmts {
numLoggedStmts = u.numIgnoredStmts
}
u.logBuffer.WriteString(fmt.Sprintf("\nLogging %d out of %d ignored statements.\n",
numLoggedStmts, u.numIgnoredStmts))

conf, err := cloudimpl.ExternalStorageConfFromURI(u.ignoreUnsupportedLogDest, user)
if err != nil {
return errors.Wrap(err, "failed to log unsupported stmts during IMPORT PGDUMP")
}
var s cloud.ExternalStorage
if s, err = u.externalStorage(ctx, conf); err != nil {
return errors.New("failed to log unsupported stmts during IMPORT PGDUMP")
}
defer s.Close()

logFileName := pgDumpUnsupportedSchemaStmtLog
if u.loggerType == dataIngestion {
logFileName = pgDumpUnsupportedDataStmtLog
}
err = s.WriteFile(ctx, logFileName, bytes.NewReader(u.logBuffer.Bytes()))
if err != nil {
return errors.Wrap(err, "failed to log unsupported stmts to log during IMPORT PGDUMP")
}
return nil
}

// parseAndCreateBundleTableDescs parses and creates the table
// descriptors for bundle formats.
func parseAndCreateBundleTableDescs(
Expand Down Expand Up @@ -1298,7 +1415,19 @@ func parseAndCreateBundleTableDescs(
tableDescs, err = readMysqlCreateTable(ctx, reader, evalCtx, p, defaultCSVTableID, parentID, tableName, fks, seqVals, owner, walltime)
case roachpb.IOFileFormat_PgDump:
evalCtx := &p.ExtendedEvalContext().EvalContext
tableDescs, err = readPostgresCreateTable(ctx, reader, evalCtx, p, tableName, parentID, walltime, fks, int(format.PgDump.MaxRowSize), owner)

// Setup a logger to handle unsupported DDL statements in the PGDUMP file.
unsupportedStmtLogger := makeUnsupportedStmtLogger(format.PgDump.IgnoreUnsupported,
format.PgDump.IgnoreUnsupportedLog, schemaParsing, p.ExecCfg().DistSQLSrv.ExternalStorage)

tableDescs, err = readPostgresCreateTable(ctx, reader, evalCtx, p, tableName, parentID,
walltime, fks, int(format.PgDump.MaxRowSize), owner, unsupportedStmtLogger)

logErr := unsupportedStmtLogger.flush(ctx, p.User())
if logErr != nil {
return nil, logErr
}

default:
return tableDescs, errors.Errorf("non-bundle format %q does not support reading schemas", format.Format.String())
}
Expand Down
Loading